MediaWiki  REL1_19
SquidPurgeClient.php
Go to the documentation of this file.
00001 <?php
00009 class SquidPurgeClient {
00010         var $host, $port, $ip;
00011 
00012         var $readState = 'idle';
00013         var $writeBuffer = '';
00014         var $requests = array();
00015         var $currentRequestIndex;
00016 
00017         const EINTR = 4;
00018         const EAGAIN = 11;
00019         const EINPROGRESS = 115;
00020         const BUFFER_SIZE = 8192;
00021 
00025         var $socket;
00026         
00027         public function __construct( $server, $options = array() ) {
00028                 $parts = explode( ':', $server, 2 );
00029                 $this->host = $parts[0];
00030                 $this->port = isset( $parts[1] ) ? $parts[1] : 80;
00031         }
00032 
00039         protected function getSocket() {
00040                 if ( $this->socket !== null ) {
00041                         return $this->socket;
00042                 }
00043 
00044                 $ip = $this->getIP();
00045                 if ( !$ip ) {
00046                         $this->log( "DNS error" );
00047                         $this->markDown();
00048                         return false;
00049                 }
00050                 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
00051                 socket_set_nonblock( $this->socket );
00052                 wfSuppressWarnings();
00053                 $ok = socket_connect( $this->socket, $ip, $this->port );
00054                 wfRestoreWarnings();
00055                 if ( !$ok ) {
00056                         $error = socket_last_error( $this->socket );
00057                         if ( $error !== self::EINPROGRESS ) {
00058                                 $this->log( "connection error: " . socket_strerror( $error ) );
00059                                 $this->markDown();
00060                                 return false;
00061                         }
00062                 }
00063 
00064                 return $this->socket;
00065         }
00066 
00071         public function getReadSocketsForSelect() {
00072                 if ( $this->readState == 'idle' ) {
00073                         return array();
00074                 }
00075                 $socket = $this->getSocket();
00076                 if ( $socket === false ) {
00077                         return array();
00078                 }
00079                 return array( $socket );
00080         }
00081 
00086         public function getWriteSocketsForSelect() {
00087                 if ( !strlen( $this->writeBuffer ) ) {
00088                         return array();
00089                 }
00090                 $socket = $this->getSocket();
00091                 if ( $socket === false ) {
00092                         return array();
00093                 }
00094                 return array( $socket );
00095         }
00096 
00101         protected function getIP() {
00102                 if ( $this->ip === null ) {
00103                         if ( IP::isIPv4( $this->host ) ) {
00104                                 $this->ip = $this->host;
00105                         } elseif ( IP::isIPv6( $this->host ) ) {
00106                                 throw new MWException( '$wgSquidServers does not support IPv6' );
00107                         } else {
00108                                 wfSuppressWarnings();
00109                                 $this->ip = gethostbyname( $this->host );
00110                                 if ( $this->ip === $this->host ) {
00111                                         $this->ip = false;
00112                                 }
00113                                 wfRestoreWarnings();
00114                         }
00115                 }
00116                 return $this->ip;
00117         }
00118 
00123         protected function markDown() {
00124                 $this->close();
00125                 $this->socket = false;
00126         }
00127 
00131         public function close() {
00132                 if ( $this->socket ) {
00133                         wfSuppressWarnings();
00134                         socket_set_block( $this->socket );
00135                         socket_shutdown( $this->socket );
00136                         socket_close( $this->socket );
00137                         wfRestoreWarnings();
00138                 }
00139                 $this->socket = null;
00140                 $this->readBuffer = '';
00141                 // Write buffer is kept since it may contain a request for the next socket
00142         }
00143 
00149         public function queuePurge( $url ) {
00150                 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) );
00151                 $this->requests[] = "PURGE $url HTTP/1.0\r\n" .
00152                         "Connection: Keep-Alive\r\n" .
00153                         "Proxy-Connection: Keep-Alive\r\n" .
00154                         "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n";
00155                 if ( $this->currentRequestIndex === null ) {
00156                         $this->nextRequest();
00157                 }
00158         }
00159 
00163         public function isIdle() {
00164                 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
00165         }
00166 
00170         public function doWrites() {
00171                 if ( !strlen( $this->writeBuffer ) ) {
00172                         return;
00173                 }
00174                 $socket = $this->getSocket();
00175                 if ( !$socket ) {
00176                         return;
00177                 }
00178 
00179                 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
00180                         $buf = $this->writeBuffer;
00181                         $flags = MSG_EOR;
00182                 } else {
00183                         $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
00184                         $flags = 0;
00185                 }
00186                 wfSuppressWarnings();
00187                 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
00188                 wfRestoreWarnings();
00189 
00190                 if ( $bytesSent === false ) {
00191                         $error = socket_last_error( $socket );
00192                         if ( $error != self::EAGAIN && $error != self::EINTR ) {
00193                                 $this->log( 'write error: ' . socket_strerror( $error ) );
00194                                 $this->markDown();
00195                         }
00196                         return;
00197                 }
00198 
00199                 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
00200         }
00201 
00205         public function doReads() {
00206                 $socket = $this->getSocket();
00207                 if ( !$socket ) {
00208                         return;
00209                 }
00210 
00211                 $buf = '';
00212                 wfSuppressWarnings();
00213                 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
00214                 wfRestoreWarnings();
00215                 if ( $bytesRead === false ) {
00216                         $error = socket_last_error( $socket );
00217                         if ( $error != self::EAGAIN && $error != self::EINTR ) {
00218                                 $this->log( 'read error: ' . socket_strerror( $error ) );
00219                                 $this->markDown();
00220                                 return;
00221                         }
00222                 } elseif ( $bytesRead === 0 ) {
00223                         // Assume EOF
00224                         $this->close();
00225                         return;
00226                 }
00227 
00228                 $this->readBuffer .= $buf;
00229                 while ( $this->socket && $this->processReadBuffer() === 'continue' );
00230         }
00231 
00236         protected function processReadBuffer() {
00237                 switch ( $this->readState ) {
00238                 case 'idle':
00239                         return 'done';
00240                 case 'status':
00241                 case 'header':
00242                         $lines = explode( "\r\n", $this->readBuffer, 2 );
00243                         if ( count( $lines ) < 2 ) {
00244                                 return 'done';
00245                         }
00246                         if ( $this->readState == 'status' )  {
00247                                 $this->processStatusLine( $lines[0] );
00248                         } else { // header
00249                                 $this->processHeaderLine( $lines[0] );
00250                         }
00251                         $this->readBuffer = $lines[1];
00252                         return 'continue';
00253                 case 'body':
00254                         if ( $this->bodyRemaining !== null ) {
00255                                 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
00256                                         $this->bodyRemaining -= strlen( $this->readBuffer );
00257                                         $this->readBuffer = '';
00258                                         return 'done';
00259                                 } else {
00260                                         $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
00261                                         $this->bodyRemaining = 0;
00262                                         $this->nextRequest();
00263                                         return 'continue';
00264                                 }
00265                         } else {
00266                                 // No content length, read all data to EOF
00267                                 $this->readBuffer = '';
00268                                 return 'done';
00269                         }
00270                 default:
00271                         throw new MWException( __METHOD__.': unexpected state' );
00272                 }
00273         }
00274 
00279         protected function processStatusLine( $line ) {
00280                 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
00281                         $this->log( 'invalid status line' );
00282                         $this->markDown();
00283                         return;
00284                 }
00285                 list( , , , $status, $reason ) = $m;
00286                 $status = intval( $status );
00287                 if ( $status !== 200 && $status !== 404 ) {
00288                         $this->log( "unexpected status code: $status $reason" );
00289                         $this->markDown();
00290                         return;
00291                 }
00292                 $this->readState = 'header';
00293         }
00294 
00298         protected function processHeaderLine( $line ) {
00299                 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
00300                         $this->bodyRemaining = intval( $m[1] );
00301                 } elseif ( $line === '' ) {
00302                         $this->readState = 'body';
00303                 }
00304         }
00305 
00306         protected function nextRequest() {
00307                 if ( $this->currentRequestIndex !== null ) {
00308                         unset( $this->requests[$this->currentRequestIndex] );
00309                 }
00310                 if ( count( $this->requests ) ) {
00311                         $this->readState = 'status';
00312                         $this->currentRequestIndex = key( $this->requests );
00313                         $this->writeBuffer = $this->requests[$this->currentRequestIndex];
00314                 } else {
00315                         $this->readState = 'idle';
00316                         $this->currentRequestIndex = null;
00317                         $this->writeBuffer = '';
00318                 }
00319                 $this->bodyRemaining = null;
00320         }
00321 
00322         protected function log( $msg ) {
00323                 wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" );
00324         }
00325 }
00326 
00327 class SquidPurgeClientPool {
00328 
00332         var $clients = array();
00333         var $timeout = 5;
00334 
00335         function __construct( $options = array() ) {
00336                 if ( isset( $options['timeout'] ) ) {
00337                         $this->timeout = $options['timeout'];
00338                 }
00339         }
00340 
00345         public function addClient( $client ) {
00346                 $this->clients[] = $client;
00347         }
00348 
00349         public function run() {
00350                 $done = false;
00351                 $startTime = microtime( true );
00352                 while ( !$done ) {
00353                         $readSockets = $writeSockets = array();
00354                         foreach ( $this->clients as $clientIndex => $client ) {
00355                                 $sockets = $client->getReadSocketsForSelect();
00356                                 foreach ( $sockets as $i => $socket ) {
00357                                         $readSockets["$clientIndex/$i"] = $socket;
00358                                 }
00359                                 $sockets = $client->getWriteSocketsForSelect();
00360                                 foreach ( $sockets as $i => $socket ) {
00361                                         $writeSockets["$clientIndex/$i"] = $socket;
00362                                 }
00363                         }
00364                         if ( !count( $readSockets ) && !count( $writeSockets ) ) {
00365                                 break;
00366                         }
00367                         $exceptSockets = null;
00368                         $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
00369                         wfSuppressWarnings();
00370                         $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
00371                         wfRestoreWarnings();
00372                         if ( $numReady === false ) {
00373                                 wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' . 
00374                                         socket_strerror( socket_last_error() ) . "\n" );
00375                                 break;
00376                         }
00377                         // Check for timeout, use 1% tolerance since we aimed at having socket_select()
00378                         // exit at precisely the overall timeout
00379                         if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
00380                                 wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" );
00381                                 break;
00382                         } elseif ( !$numReady ) {
00383                                 continue;
00384                         }
00385 
00386                         foreach ( $readSockets as $key => $socket ) {
00387                                 list( $clientIndex, ) = explode( '/', $key );
00388                                 $client = $this->clients[$clientIndex];
00389                                 $client->doReads();
00390                         }
00391                         foreach ( $writeSockets as $key => $socket ) {
00392                                 list( $clientIndex, ) = explode( '/', $key );
00393                                 $client = $this->clients[$clientIndex];
00394                                 $client->doWrites();
00395                         }
00396 
00397                         $done = true;
00398                         foreach ( $this->clients as $client ) {
00399                                 if ( !$client->isIdle() ) {
00400                                         $done = false;
00401                                 }
00402                         }
00403                 }
00404                 foreach ( $this->clients as $client ) {
00405                         $client->close();
00406                 }
00407         }
00408 }