MediaWiki  REL1_21
SquidPurgeClient.php
Go to the documentation of this file.
00001 <?php
00030 class SquidPurgeClient {
00031         var $host, $port, $ip;
00032 
00033         var $readState = 'idle';
00034         var $writeBuffer = '';
00035         var $requests = array();
00036         var $currentRequestIndex;
00037 
00038         const EINTR = 4;
00039         const EAGAIN = 11;
00040         const EINPROGRESS = 115;
00041         const BUFFER_SIZE = 8192;
00042 
00046         var $socket;
00047 
00048         var $readBuffer;
00049 
00050         var $bodyRemaining;
00051 
00056         public function __construct( $server, $options = array() ) {
00057                 $parts = explode( ':', $server, 2 );
00058                 $this->host = $parts[0];
00059                 $this->port = isset( $parts[1] ) ? $parts[1] : 80;
00060         }
00061 
00068         protected function getSocket() {
00069                 if ( $this->socket !== null ) {
00070                         return $this->socket;
00071                 }
00072 
00073                 $ip = $this->getIP();
00074                 if ( !$ip ) {
00075                         $this->log( "DNS error" );
00076                         $this->markDown();
00077                         return false;
00078                 }
00079                 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
00080                 socket_set_nonblock( $this->socket );
00081                 wfSuppressWarnings();
00082                 $ok = socket_connect( $this->socket, $ip, $this->port );
00083                 wfRestoreWarnings();
00084                 if ( !$ok ) {
00085                         $error = socket_last_error( $this->socket );
00086                         if ( $error !== self::EINPROGRESS ) {
00087                                 $this->log( "connection error: " . socket_strerror( $error ) );
00088                                 $this->markDown();
00089                                 return false;
00090                         }
00091                 }
00092 
00093                 return $this->socket;
00094         }
00095 
00100         public function getReadSocketsForSelect() {
00101                 if ( $this->readState == 'idle' ) {
00102                         return array();
00103                 }
00104                 $socket = $this->getSocket();
00105                 if ( $socket === false ) {
00106                         return array();
00107                 }
00108                 return array( $socket );
00109         }
00110 
00115         public function getWriteSocketsForSelect() {
00116                 if ( !strlen( $this->writeBuffer ) ) {
00117                         return array();
00118                 }
00119                 $socket = $this->getSocket();
00120                 if ( $socket === false ) {
00121                         return array();
00122                 }
00123                 return array( $socket );
00124         }
00125 
00130         protected function getIP() {
00131                 if ( $this->ip === null ) {
00132                         if ( IP::isIPv4( $this->host ) ) {
00133                                 $this->ip = $this->host;
00134                         } elseif ( IP::isIPv6( $this->host ) ) {
00135                                 throw new MWException( '$wgSquidServers does not support IPv6' );
00136                         } else {
00137                                 wfSuppressWarnings();
00138                                 $this->ip = gethostbyname( $this->host );
00139                                 if ( $this->ip === $this->host ) {
00140                                         $this->ip = false;
00141                                 }
00142                                 wfRestoreWarnings();
00143                         }
00144                 }
00145                 return $this->ip;
00146         }
00147 
00152         protected function markDown() {
00153                 $this->close();
00154                 $this->socket = false;
00155         }
00156 
00160         public function close() {
00161                 if ( $this->socket ) {
00162                         wfSuppressWarnings();
00163                         socket_set_block( $this->socket );
00164                         socket_shutdown( $this->socket );
00165                         socket_close( $this->socket );
00166                         wfRestoreWarnings();
00167                 }
00168                 $this->socket = null;
00169                 $this->readBuffer = '';
00170                 // Write buffer is kept since it may contain a request for the next socket
00171         }
00172 
00178         public function queuePurge( $url ) {
00179                 global $wgSquidPurgeUseHostHeader;
00180                 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) );
00181                 $request = array();
00182                 if ( $wgSquidPurgeUseHostHeader ) {
00183                         $url = wfParseUrl( $url );
00184                         $host = $url['host'];
00185                         if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) {
00186                                 $host .= ":" . $url['port'];
00187                         }
00188                         $path = $url['path'];
00189                         if ( isset( $url['query'] ) && is_string( $url['query'] ) ) {
00190                                 $path = wfAppendQuery( $path, $url['query'] );
00191                         }
00192                         $request[] = "PURGE $path HTTP/1.1";
00193                         $request[] = "Host: $host";
00194                 } else {
00195                         $request[] = "PURGE $url HTTP/1.0";
00196                 }
00197                 $request[] = "Connection: Keep-Alive";
00198                 $request[] = "Proxy-Connection: Keep-Alive";
00199                 $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__;
00200                 // Two ''s to create \r\n\r\n
00201                 $request[] = '';
00202                 $request[] = '';
00203 
00204                 $this->requests[] = implode( "\r\n", $request );
00205                 if ( $this->currentRequestIndex === null ) {
00206                         $this->nextRequest();
00207                 }
00208         }
00209 
00213         public function isIdle() {
00214                 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
00215         }
00216 
00220         public function doWrites() {
00221                 if ( !strlen( $this->writeBuffer ) ) {
00222                         return;
00223                 }
00224                 $socket = $this->getSocket();
00225                 if ( !$socket ) {
00226                         return;
00227                 }
00228 
00229                 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
00230                         $buf = $this->writeBuffer;
00231                         $flags = MSG_EOR;
00232                 } else {
00233                         $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
00234                         $flags = 0;
00235                 }
00236                 wfSuppressWarnings();
00237                 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
00238                 wfRestoreWarnings();
00239 
00240                 if ( $bytesSent === false ) {
00241                         $error = socket_last_error( $socket );
00242                         if ( $error != self::EAGAIN && $error != self::EINTR ) {
00243                                 $this->log( 'write error: ' . socket_strerror( $error ) );
00244                                 $this->markDown();
00245                         }
00246                         return;
00247                 }
00248 
00249                 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
00250         }
00251 
00255         public function doReads() {
00256                 $socket = $this->getSocket();
00257                 if ( !$socket ) {
00258                         return;
00259                 }
00260 
00261                 $buf = '';
00262                 wfSuppressWarnings();
00263                 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
00264                 wfRestoreWarnings();
00265                 if ( $bytesRead === false ) {
00266                         $error = socket_last_error( $socket );
00267                         if ( $error != self::EAGAIN && $error != self::EINTR ) {
00268                                 $this->log( 'read error: ' . socket_strerror( $error ) );
00269                                 $this->markDown();
00270                                 return;
00271                         }
00272                 } elseif ( $bytesRead === 0 ) {
00273                         // Assume EOF
00274                         $this->close();
00275                         return;
00276                 }
00277 
00278                 $this->readBuffer .= $buf;
00279                 while ( $this->socket && $this->processReadBuffer() === 'continue' );
00280         }
00281 
00286         protected function processReadBuffer() {
00287                 switch ( $this->readState ) {
00288                 case 'idle':
00289                         return 'done';
00290                 case 'status':
00291                 case 'header':
00292                         $lines = explode( "\r\n", $this->readBuffer, 2 );
00293                         if ( count( $lines ) < 2 ) {
00294                                 return 'done';
00295                         }
00296                         if ( $this->readState == 'status' ) {
00297                                 $this->processStatusLine( $lines[0] );
00298                         } else { // header
00299                                 $this->processHeaderLine( $lines[0] );
00300                         }
00301                         $this->readBuffer = $lines[1];
00302                         return 'continue';
00303                 case 'body':
00304                         if ( $this->bodyRemaining !== null ) {
00305                                 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
00306                                         $this->bodyRemaining -= strlen( $this->readBuffer );
00307                                         $this->readBuffer = '';
00308                                         return 'done';
00309                                 } else {
00310                                         $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
00311                                         $this->bodyRemaining = 0;
00312                                         $this->nextRequest();
00313                                         return 'continue';
00314                                 }
00315                         } else {
00316                                 // No content length, read all data to EOF
00317                                 $this->readBuffer = '';
00318                                 return 'done';
00319                         }
00320                 default:
00321                         throw new MWException( __METHOD__ . ': unexpected state' );
00322                 }
00323         }
00324 
00329         protected function processStatusLine( $line ) {
00330                 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
00331                         $this->log( 'invalid status line' );
00332                         $this->markDown();
00333                         return;
00334                 }
00335                 list( , , , $status, $reason ) = $m;
00336                 $status = intval( $status );
00337                 if ( $status !== 200 && $status !== 404 ) {
00338                         $this->log( "unexpected status code: $status $reason" );
00339                         $this->markDown();
00340                         return;
00341                 }
00342                 $this->readState = 'header';
00343         }
00344 
00348         protected function processHeaderLine( $line ) {
00349                 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
00350                         $this->bodyRemaining = intval( $m[1] );
00351                 } elseif ( $line === '' ) {
00352                         $this->readState = 'body';
00353                 }
00354         }
00355 
00356         protected function nextRequest() {
00357                 if ( $this->currentRequestIndex !== null ) {
00358                         unset( $this->requests[$this->currentRequestIndex] );
00359                 }
00360                 if ( count( $this->requests ) ) {
00361                         $this->readState = 'status';
00362                         $this->currentRequestIndex = key( $this->requests );
00363                         $this->writeBuffer = $this->requests[$this->currentRequestIndex];
00364                 } else {
00365                         $this->readState = 'idle';
00366                         $this->currentRequestIndex = null;
00367                         $this->writeBuffer = '';
00368                 }
00369                 $this->bodyRemaining = null;
00370         }
00371 
00375         protected function log( $msg ) {
00376                 wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg\n" );
00377         }
00378 }
00379 
00380 class SquidPurgeClientPool {
00381 
00385         var $clients = array();
00386         var $timeout = 5;
00387 
00391         function __construct( $options = array() ) {
00392                 if ( isset( $options['timeout'] ) ) {
00393                         $this->timeout = $options['timeout'];
00394                 }
00395         }
00396 
00401         public function addClient( $client ) {
00402                 $this->clients[] = $client;
00403         }
00404 
00405         public function run() {
00406                 $done = false;
00407                 $startTime = microtime( true );
00408                 while ( !$done ) {
00409                         $readSockets = $writeSockets = array();
00413                         foreach ( $this->clients as $clientIndex => $client ) {
00414                                 $sockets = $client->getReadSocketsForSelect();
00415                                 foreach ( $sockets as $i => $socket ) {
00416                                         $readSockets["$clientIndex/$i"] = $socket;
00417                                 }
00418                                 $sockets = $client->getWriteSocketsForSelect();
00419                                 foreach ( $sockets as $i => $socket ) {
00420                                         $writeSockets["$clientIndex/$i"] = $socket;
00421                                 }
00422                         }
00423                         if ( !count( $readSockets ) && !count( $writeSockets ) ) {
00424                                 break;
00425                         }
00426                         $exceptSockets = null;
00427                         $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
00428                         wfSuppressWarnings();
00429                         $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
00430                         wfRestoreWarnings();
00431                         if ( $numReady === false ) {
00432                                 wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' .
00433                                         socket_strerror( socket_last_error() ) . "\n" );
00434                                 break;
00435                         }
00436                         // Check for timeout, use 1% tolerance since we aimed at having socket_select()
00437                         // exit at precisely the overall timeout
00438                         if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
00439                                 wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" );
00440                                 break;
00441                         } elseif ( !$numReady ) {
00442                                 continue;
00443                         }
00444 
00445                         foreach ( $readSockets as $key => $socket ) {
00446                                 list( $clientIndex, ) = explode( '/', $key );
00447                                 $client = $this->clients[$clientIndex];
00448                                 $client->doReads();
00449                         }
00450                         foreach ( $writeSockets as $key => $socket ) {
00451                                 list( $clientIndex, ) = explode( '/', $key );
00452                                 $client = $this->clients[$clientIndex];
00453                                 $client->doWrites();
00454                         }
00455 
00456                         $done = true;
00457                         foreach ( $this->clients as $client ) {
00458                                 if ( !$client->isIdle() ) {
00459                                         $done = false;
00460                                 }
00461                         }
00462                 }
00463                 foreach ( $this->clients as $client ) {
00464                         $client->close();
00465                 }
00466         }
00467 }