MediaWiki  REL1_24
SquidPurgeClient.php
Go to the documentation of this file.
00001 <?php
00030 class SquidPurgeClient {
00032     protected $host;
00033 
00035     protected $port;
00036 
00038     protected $ip;
00039 
00041     protected $readState = 'idle';
00042 
00044     protected $writeBuffer = '';
00045 
00047     protected $requests = array();
00048 
00050     protected $currentRequestIndex;
00051 
00052     const EINTR = 4;
00053     const EAGAIN = 11;
00054     const EINPROGRESS = 115;
00055     const BUFFER_SIZE = 8192;
00056 
00061     protected $socket;
00062 
00064     protected $readBuffer;
00065 
00067     protected $bodyRemaining;
00068 
00073     public function __construct( $server, $options = array() ) {
00074         $parts = explode( ':', $server, 2 );
00075         $this->host = $parts[0];
00076         $this->port = isset( $parts[1] ) ? $parts[1] : 80;
00077     }
00078 
00085     protected function getSocket() {
00086         if ( $this->socket !== null ) {
00087             return $this->socket;
00088         }
00089 
00090         $ip = $this->getIP();
00091         if ( !$ip ) {
00092             $this->log( "DNS error" );
00093             $this->markDown();
00094             return false;
00095         }
00096         $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
00097         socket_set_nonblock( $this->socket );
00098         wfSuppressWarnings();
00099         $ok = socket_connect( $this->socket, $ip, $this->port );
00100         wfRestoreWarnings();
00101         if ( !$ok ) {
00102             $error = socket_last_error( $this->socket );
00103             if ( $error !== self::EINPROGRESS ) {
00104                 $this->log( "connection error: " . socket_strerror( $error ) );
00105                 $this->markDown();
00106                 return false;
00107             }
00108         }
00109 
00110         return $this->socket;
00111     }
00112 
00117     public function getReadSocketsForSelect() {
00118         if ( $this->readState == 'idle' ) {
00119             return array();
00120         }
00121         $socket = $this->getSocket();
00122         if ( $socket === false ) {
00123             return array();
00124         }
00125         return array( $socket );
00126     }
00127 
00132     public function getWriteSocketsForSelect() {
00133         if ( !strlen( $this->writeBuffer ) ) {
00134             return array();
00135         }
00136         $socket = $this->getSocket();
00137         if ( $socket === false ) {
00138             return array();
00139         }
00140         return array( $socket );
00141     }
00142 
00149     protected function getIP() {
00150         if ( $this->ip === null ) {
00151             if ( IP::isIPv4( $this->host ) ) {
00152                 $this->ip = $this->host;
00153             } elseif ( IP::isIPv6( $this->host ) ) {
00154                 throw new MWException( '$wgSquidServers does not support IPv6' );
00155             } else {
00156                 wfSuppressWarnings();
00157                 $this->ip = gethostbyname( $this->host );
00158                 if ( $this->ip === $this->host ) {
00159                     $this->ip = false;
00160                 }
00161                 wfRestoreWarnings();
00162             }
00163         }
00164         return $this->ip;
00165     }
00166 
00171     protected function markDown() {
00172         $this->close();
00173         $this->socket = false;
00174     }
00175 
00179     public function close() {
00180         if ( $this->socket ) {
00181             wfSuppressWarnings();
00182             socket_set_block( $this->socket );
00183             socket_shutdown( $this->socket );
00184             socket_close( $this->socket );
00185             wfRestoreWarnings();
00186         }
00187         $this->socket = null;
00188         $this->readBuffer = '';
00189         // Write buffer is kept since it may contain a request for the next socket
00190     }
00191 
00197     public function queuePurge( $url ) {
00198         global $wgSquidPurgeUseHostHeader;
00199         $url = SquidUpdate::expand( str_replace( "\n", '', $url ) );
00200         $request = array();
00201         if ( $wgSquidPurgeUseHostHeader ) {
00202             $url = wfParseUrl( $url );
00203             $host = $url['host'];
00204             if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) {
00205                 $host .= ":" . $url['port'];
00206             }
00207             $path = $url['path'];
00208             if ( isset( $url['query'] ) && is_string( $url['query'] ) ) {
00209                 $path = wfAppendQuery( $path, $url['query'] );
00210             }
00211             $request[] = "PURGE $path HTTP/1.1";
00212             $request[] = "Host: $host";
00213         } else {
00214             $request[] = "PURGE $url HTTP/1.0";
00215         }
00216         $request[] = "Connection: Keep-Alive";
00217         $request[] = "Proxy-Connection: Keep-Alive";
00218         $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__;
00219         // Two ''s to create \r\n\r\n
00220         $request[] = '';
00221         $request[] = '';
00222 
00223         $this->requests[] = implode( "\r\n", $request );
00224         if ( $this->currentRequestIndex === null ) {
00225             $this->nextRequest();
00226         }
00227     }
00228 
00232     public function isIdle() {
00233         return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
00234     }
00235 
00239     public function doWrites() {
00240         if ( !strlen( $this->writeBuffer ) ) {
00241             return;
00242         }
00243         $socket = $this->getSocket();
00244         if ( !$socket ) {
00245             return;
00246         }
00247 
00248         if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
00249             $buf = $this->writeBuffer;
00250             $flags = MSG_EOR;
00251         } else {
00252             $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
00253             $flags = 0;
00254         }
00255         wfSuppressWarnings();
00256         $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
00257         wfRestoreWarnings();
00258 
00259         if ( $bytesSent === false ) {
00260             $error = socket_last_error( $socket );
00261             if ( $error != self::EAGAIN && $error != self::EINTR ) {
00262                 $this->log( 'write error: ' . socket_strerror( $error ) );
00263                 $this->markDown();
00264             }
00265             return;
00266         }
00267 
00268         $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
00269     }
00270 
00274     public function doReads() {
00275         $socket = $this->getSocket();
00276         if ( !$socket ) {
00277             return;
00278         }
00279 
00280         $buf = '';
00281         wfSuppressWarnings();
00282         $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
00283         wfRestoreWarnings();
00284         if ( $bytesRead === false ) {
00285             $error = socket_last_error( $socket );
00286             if ( $error != self::EAGAIN && $error != self::EINTR ) {
00287                 $this->log( 'read error: ' . socket_strerror( $error ) );
00288                 $this->markDown();
00289                 return;
00290             }
00291         } elseif ( $bytesRead === 0 ) {
00292             // Assume EOF
00293             $this->close();
00294             return;
00295         }
00296 
00297         $this->readBuffer .= $buf;
00298         while ( $this->socket && $this->processReadBuffer() === 'continue' );
00299     }
00300 
00305     protected function processReadBuffer() {
00306         switch ( $this->readState ) {
00307         case 'idle':
00308             return 'done';
00309         case 'status':
00310         case 'header':
00311             $lines = explode( "\r\n", $this->readBuffer, 2 );
00312             if ( count( $lines ) < 2 ) {
00313                 return 'done';
00314             }
00315             if ( $this->readState == 'status' ) {
00316                 $this->processStatusLine( $lines[0] );
00317             } else { // header
00318                 $this->processHeaderLine( $lines[0] );
00319             }
00320             $this->readBuffer = $lines[1];
00321             return 'continue';
00322         case 'body':
00323             if ( $this->bodyRemaining !== null ) {
00324                 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
00325                     $this->bodyRemaining -= strlen( $this->readBuffer );
00326                     $this->readBuffer = '';
00327                     return 'done';
00328                 } else {
00329                     $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
00330                     $this->bodyRemaining = 0;
00331                     $this->nextRequest();
00332                     return 'continue';
00333                 }
00334             } else {
00335                 // No content length, read all data to EOF
00336                 $this->readBuffer = '';
00337                 return 'done';
00338             }
00339         default:
00340             throw new MWException( __METHOD__ . ': unexpected state' );
00341         }
00342     }
00343 
00347     protected function processStatusLine( $line ) {
00348         if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
00349             $this->log( 'invalid status line' );
00350             $this->markDown();
00351             return;
00352         }
00353         list( , , , $status, $reason ) = $m;
00354         $status = intval( $status );
00355         if ( $status !== 200 && $status !== 404 ) {
00356             $this->log( "unexpected status code: $status $reason" );
00357             $this->markDown();
00358             return;
00359         }
00360         $this->readState = 'header';
00361     }
00362 
00366     protected function processHeaderLine( $line ) {
00367         if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
00368             $this->bodyRemaining = intval( $m[1] );
00369         } elseif ( $line === '' ) {
00370             $this->readState = 'body';
00371         }
00372     }
00373 
00374     protected function nextRequest() {
00375         if ( $this->currentRequestIndex !== null ) {
00376             unset( $this->requests[$this->currentRequestIndex] );
00377         }
00378         if ( count( $this->requests ) ) {
00379             $this->readState = 'status';
00380             $this->currentRequestIndex = key( $this->requests );
00381             $this->writeBuffer = $this->requests[$this->currentRequestIndex];
00382         } else {
00383             $this->readState = 'idle';
00384             $this->currentRequestIndex = null;
00385             $this->writeBuffer = '';
00386         }
00387         $this->bodyRemaining = null;
00388     }
00389 
00393     protected function log( $msg ) {
00394         wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg" );
00395     }
00396 }
00397 
00398 class SquidPurgeClientPool {
00400     protected $clients = array();
00401 
00403     protected $timeout = 5;
00404 
00408     function __construct( $options = array() ) {
00409         if ( isset( $options['timeout'] ) ) {
00410             $this->timeout = $options['timeout'];
00411         }
00412     }
00413 
00418     public function addClient( $client ) {
00419         $this->clients[] = $client;
00420     }
00421 
00422     public function run() {
00423         $done = false;
00424         $startTime = microtime( true );
00425         while ( !$done ) {
00426             $readSockets = $writeSockets = array();
00430             foreach ( $this->clients as $clientIndex => $client ) {
00431                 $sockets = $client->getReadSocketsForSelect();
00432                 foreach ( $sockets as $i => $socket ) {
00433                     $readSockets["$clientIndex/$i"] = $socket;
00434                 }
00435                 $sockets = $client->getWriteSocketsForSelect();
00436                 foreach ( $sockets as $i => $socket ) {
00437                     $writeSockets["$clientIndex/$i"] = $socket;
00438                 }
00439             }
00440             if ( !count( $readSockets ) && !count( $writeSockets ) ) {
00441                 break;
00442             }
00443             $exceptSockets = null;
00444             $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
00445             wfSuppressWarnings();
00446             $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
00447             wfRestoreWarnings();
00448             if ( $numReady === false ) {
00449                 wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' .
00450                     socket_strerror( socket_last_error() ) . "\n" );
00451                 break;
00452             }
00453             // Check for timeout, use 1% tolerance since we aimed at having socket_select()
00454             // exit at precisely the overall timeout
00455             if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
00456                 wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" );
00457                 break;
00458             } elseif ( !$numReady ) {
00459                 continue;
00460             }
00461 
00462             foreach ( $readSockets as $key => $socket ) {
00463                 list( $clientIndex, ) = explode( '/', $key );
00464                 $client = $this->clients[$clientIndex];
00465                 $client->doReads();
00466             }
00467             foreach ( $writeSockets as $key => $socket ) {
00468                 list( $clientIndex, ) = explode( '/', $key );
00469                 $client = $this->clients[$clientIndex];
00470                 $client->doWrites();
00471             }
00472 
00473             $done = true;
00474             foreach ( $this->clients as $client ) {
00475                 if ( !$client->isIdle() ) {
00476                     $done = false;
00477                 }
00478             }
00479         }
00480         foreach ( $this->clients as $client ) {
00481             $client->close();
00482         }
00483     }
00484 }