MediaWiki  REL1_20
SquidPurgeClient.php
Go to the documentation of this file.
00001 <?php
00030 class SquidPurgeClient {
00031         var $host, $port, $ip;
00032 
00033         var $readState = 'idle';
00034         var $writeBuffer = '';
00035         var $requests = array();
00036         var $currentRequestIndex;
00037 
00038         const EINTR = 4;
00039         const EAGAIN = 11;
00040         const EINPROGRESS = 115;
00041         const BUFFER_SIZE = 8192;
00042 
00046         var $socket;
00047 
00048         var $readBuffer;
00049 
00050         var $bodyRemaining;
00051 
00056         public function __construct( $server, $options = array() ) {
00057                 $parts = explode( ':', $server, 2 );
00058                 $this->host = $parts[0];
00059                 $this->port = isset( $parts[1] ) ? $parts[1] : 80;
00060         }
00061 
00068         protected function getSocket() {
00069                 if ( $this->socket !== null ) {
00070                         return $this->socket;
00071                 }
00072 
00073                 $ip = $this->getIP();
00074                 if ( !$ip ) {
00075                         $this->log( "DNS error" );
00076                         $this->markDown();
00077                         return false;
00078                 }
00079                 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
00080                 socket_set_nonblock( $this->socket );
00081                 wfSuppressWarnings();
00082                 $ok = socket_connect( $this->socket, $ip, $this->port );
00083                 wfRestoreWarnings();
00084                 if ( !$ok ) {
00085                         $error = socket_last_error( $this->socket );
00086                         if ( $error !== self::EINPROGRESS ) {
00087                                 $this->log( "connection error: " . socket_strerror( $error ) );
00088                                 $this->markDown();
00089                                 return false;
00090                         }
00091                 }
00092 
00093                 return $this->socket;
00094         }
00095 
00100         public function getReadSocketsForSelect() {
00101                 if ( $this->readState == 'idle' ) {
00102                         return array();
00103                 }
00104                 $socket = $this->getSocket();
00105                 if ( $socket === false ) {
00106                         return array();
00107                 }
00108                 return array( $socket );
00109         }
00110 
00115         public function getWriteSocketsForSelect() {
00116                 if ( !strlen( $this->writeBuffer ) ) {
00117                         return array();
00118                 }
00119                 $socket = $this->getSocket();
00120                 if ( $socket === false ) {
00121                         return array();
00122                 }
00123                 return array( $socket );
00124         }
00125 
00130         protected function getIP() {
00131                 if ( $this->ip === null ) {
00132                         if ( IP::isIPv4( $this->host ) ) {
00133                                 $this->ip = $this->host;
00134                         } elseif ( IP::isIPv6( $this->host ) ) {
00135                                 throw new MWException( '$wgSquidServers does not support IPv6' );
00136                         } else {
00137                                 wfSuppressWarnings();
00138                                 $this->ip = gethostbyname( $this->host );
00139                                 if ( $this->ip === $this->host ) {
00140                                         $this->ip = false;
00141                                 }
00142                                 wfRestoreWarnings();
00143                         }
00144                 }
00145                 return $this->ip;
00146         }
00147 
00152         protected function markDown() {
00153                 $this->close();
00154                 $this->socket = false;
00155         }
00156 
00160         public function close() {
00161                 if ( $this->socket ) {
00162                         wfSuppressWarnings();
00163                         socket_set_block( $this->socket );
00164                         socket_shutdown( $this->socket );
00165                         socket_close( $this->socket );
00166                         wfRestoreWarnings();
00167                 }
00168                 $this->socket = null;
00169                 $this->readBuffer = '';
00170                 // Write buffer is kept since it may contain a request for the next socket
00171         }
00172 
00178         public function queuePurge( $url ) {
00179                 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) );
00180                 $this->requests[] = "PURGE $url HTTP/1.0\r\n" .
00181                         "Connection: Keep-Alive\r\n" .
00182                         "Proxy-Connection: Keep-Alive\r\n" .
00183                         "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n";
00184                 if ( $this->currentRequestIndex === null ) {
00185                         $this->nextRequest();
00186                 }
00187         }
00188 
00192         public function isIdle() {
00193                 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
00194         }
00195 
00199         public function doWrites() {
00200                 if ( !strlen( $this->writeBuffer ) ) {
00201                         return;
00202                 }
00203                 $socket = $this->getSocket();
00204                 if ( !$socket ) {
00205                         return;
00206                 }
00207 
00208                 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
00209                         $buf = $this->writeBuffer;
00210                         $flags = MSG_EOR;
00211                 } else {
00212                         $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
00213                         $flags = 0;
00214                 }
00215                 wfSuppressWarnings();
00216                 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
00217                 wfRestoreWarnings();
00218 
00219                 if ( $bytesSent === false ) {
00220                         $error = socket_last_error( $socket );
00221                         if ( $error != self::EAGAIN && $error != self::EINTR ) {
00222                                 $this->log( 'write error: ' . socket_strerror( $error ) );
00223                                 $this->markDown();
00224                         }
00225                         return;
00226                 }
00227 
00228                 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
00229         }
00230 
00234         public function doReads() {
00235                 $socket = $this->getSocket();
00236                 if ( !$socket ) {
00237                         return;
00238                 }
00239 
00240                 $buf = '';
00241                 wfSuppressWarnings();
00242                 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
00243                 wfRestoreWarnings();
00244                 if ( $bytesRead === false ) {
00245                         $error = socket_last_error( $socket );
00246                         if ( $error != self::EAGAIN && $error != self::EINTR ) {
00247                                 $this->log( 'read error: ' . socket_strerror( $error ) );
00248                                 $this->markDown();
00249                                 return;
00250                         }
00251                 } elseif ( $bytesRead === 0 ) {
00252                         // Assume EOF
00253                         $this->close();
00254                         return;
00255                 }
00256 
00257                 $this->readBuffer .= $buf;
00258                 while ( $this->socket && $this->processReadBuffer() === 'continue' );
00259         }
00260 
00265         protected function processReadBuffer() {
00266                 switch ( $this->readState ) {
00267                 case 'idle':
00268                         return 'done';
00269                 case 'status':
00270                 case 'header':
00271                         $lines = explode( "\r\n", $this->readBuffer, 2 );
00272                         if ( count( $lines ) < 2 ) {
00273                                 return 'done';
00274                         }
00275                         if ( $this->readState == 'status' )  {
00276                                 $this->processStatusLine( $lines[0] );
00277                         } else { // header
00278                                 $this->processHeaderLine( $lines[0] );
00279                         }
00280                         $this->readBuffer = $lines[1];
00281                         return 'continue';
00282                 case 'body':
00283                         if ( $this->bodyRemaining !== null ) {
00284                                 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
00285                                         $this->bodyRemaining -= strlen( $this->readBuffer );
00286                                         $this->readBuffer = '';
00287                                         return 'done';
00288                                 } else {
00289                                         $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
00290                                         $this->bodyRemaining = 0;
00291                                         $this->nextRequest();
00292                                         return 'continue';
00293                                 }
00294                         } else {
00295                                 // No content length, read all data to EOF
00296                                 $this->readBuffer = '';
00297                                 return 'done';
00298                         }
00299                 default:
00300                         throw new MWException( __METHOD__.': unexpected state' );
00301                 }
00302         }
00303 
00308         protected function processStatusLine( $line ) {
00309                 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
00310                         $this->log( 'invalid status line' );
00311                         $this->markDown();
00312                         return;
00313                 }
00314                 list( , , , $status, $reason ) = $m;
00315                 $status = intval( $status );
00316                 if ( $status !== 200 && $status !== 404 ) {
00317                         $this->log( "unexpected status code: $status $reason" );
00318                         $this->markDown();
00319                         return;
00320                 }
00321                 $this->readState = 'header';
00322         }
00323 
00327         protected function processHeaderLine( $line ) {
00328                 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
00329                         $this->bodyRemaining = intval( $m[1] );
00330                 } elseif ( $line === '' ) {
00331                         $this->readState = 'body';
00332                 }
00333         }
00334 
00335         protected function nextRequest() {
00336                 if ( $this->currentRequestIndex !== null ) {
00337                         unset( $this->requests[$this->currentRequestIndex] );
00338                 }
00339                 if ( count( $this->requests ) ) {
00340                         $this->readState = 'status';
00341                         $this->currentRequestIndex = key( $this->requests );
00342                         $this->writeBuffer = $this->requests[$this->currentRequestIndex];
00343                 } else {
00344                         $this->readState = 'idle';
00345                         $this->currentRequestIndex = null;
00346                         $this->writeBuffer = '';
00347                 }
00348                 $this->bodyRemaining = null;
00349         }
00350 
00354         protected function log( $msg ) {
00355                 wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" );
00356         }
00357 }
00358 
00359 class SquidPurgeClientPool {
00360 
00364         var $clients = array();
00365         var $timeout = 5;
00366 
00370         function __construct( $options = array() ) {
00371                 if ( isset( $options['timeout'] ) ) {
00372                         $this->timeout = $options['timeout'];
00373                 }
00374         }
00375 
00380         public function addClient( $client ) {
00381                 $this->clients[] = $client;
00382         }
00383 
00384         public function run() {
00385                 $done = false;
00386                 $startTime = microtime( true );
00387                 while ( !$done ) {
00388                         $readSockets = $writeSockets = array();
00392                         foreach ( $this->clients as $clientIndex => $client ) {
00393                                 $sockets = $client->getReadSocketsForSelect();
00394                                 foreach ( $sockets as $i => $socket ) {
00395                                         $readSockets["$clientIndex/$i"] = $socket;
00396                                 }
00397                                 $sockets = $client->getWriteSocketsForSelect();
00398                                 foreach ( $sockets as $i => $socket ) {
00399                                         $writeSockets["$clientIndex/$i"] = $socket;
00400                                 }
00401                         }
00402                         if ( !count( $readSockets ) && !count( $writeSockets ) ) {
00403                                 break;
00404                         }
00405                         $exceptSockets = null;
00406                         $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
00407                         wfSuppressWarnings();
00408                         $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
00409                         wfRestoreWarnings();
00410                         if ( $numReady === false ) {
00411                                 wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' . 
00412                                         socket_strerror( socket_last_error() ) . "\n" );
00413                                 break;
00414                         }
00415                         // Check for timeout, use 1% tolerance since we aimed at having socket_select()
00416                         // exit at precisely the overall timeout
00417                         if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
00418                                 wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" );
00419                                 break;
00420                         } elseif ( !$numReady ) {
00421                                 continue;
00422                         }
00423 
00424                         foreach ( $readSockets as $key => $socket ) {
00425                                 list( $clientIndex, ) = explode( '/', $key );
00426                                 $client = $this->clients[$clientIndex];
00427                                 $client->doReads();
00428                         }
00429                         foreach ( $writeSockets as $key => $socket ) {
00430                                 list( $clientIndex, ) = explode( '/', $key );
00431                                 $client = $this->clients[$clientIndex];
00432                                 $client->doWrites();
00433                         }
00434 
00435                         $done = true;
00436                         foreach ( $this->clients as $client ) {
00437                                 if ( !$client->isIdle() ) {
00438                                         $done = false;
00439                                 }
00440                         }
00441                 }
00442                 foreach ( $this->clients as $client ) {
00443                         $client->close();
00444                 }
00445         }
00446 }