MediaWiki
REL1_19
|
00001 <?php 00009 class SquidPurgeClient { 00010 var $host, $port, $ip; 00011 00012 var $readState = 'idle'; 00013 var $writeBuffer = ''; 00014 var $requests = array(); 00015 var $currentRequestIndex; 00016 00017 const EINTR = 4; 00018 const EAGAIN = 11; 00019 const EINPROGRESS = 115; 00020 const BUFFER_SIZE = 8192; 00021 00025 var $socket; 00026 00027 public function __construct( $server, $options = array() ) { 00028 $parts = explode( ':', $server, 2 ); 00029 $this->host = $parts[0]; 00030 $this->port = isset( $parts[1] ) ? $parts[1] : 80; 00031 } 00032 00039 protected function getSocket() { 00040 if ( $this->socket !== null ) { 00041 return $this->socket; 00042 } 00043 00044 $ip = $this->getIP(); 00045 if ( !$ip ) { 00046 $this->log( "DNS error" ); 00047 $this->markDown(); 00048 return false; 00049 } 00050 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); 00051 socket_set_nonblock( $this->socket ); 00052 wfSuppressWarnings(); 00053 $ok = socket_connect( $this->socket, $ip, $this->port ); 00054 wfRestoreWarnings(); 00055 if ( !$ok ) { 00056 $error = socket_last_error( $this->socket ); 00057 if ( $error !== self::EINPROGRESS ) { 00058 $this->log( "connection error: " . socket_strerror( $error ) ); 00059 $this->markDown(); 00060 return false; 00061 } 00062 } 00063 00064 return $this->socket; 00065 } 00066 00071 public function getReadSocketsForSelect() { 00072 if ( $this->readState == 'idle' ) { 00073 return array(); 00074 } 00075 $socket = $this->getSocket(); 00076 if ( $socket === false ) { 00077 return array(); 00078 } 00079 return array( $socket ); 00080 } 00081 00086 public function getWriteSocketsForSelect() { 00087 if ( !strlen( $this->writeBuffer ) ) { 00088 return array(); 00089 } 00090 $socket = $this->getSocket(); 00091 if ( $socket === false ) { 00092 return array(); 00093 } 00094 return array( $socket ); 00095 } 00096 00101 protected function getIP() { 00102 if ( $this->ip === null ) { 00103 if ( IP::isIPv4( $this->host ) ) { 00104 $this->ip = $this->host; 00105 } elseif ( IP::isIPv6( $this->host ) ) { 00106 throw new MWException( '$wgSquidServers does not support IPv6' ); 00107 } else { 00108 wfSuppressWarnings(); 00109 $this->ip = gethostbyname( $this->host ); 00110 if ( $this->ip === $this->host ) { 00111 $this->ip = false; 00112 } 00113 wfRestoreWarnings(); 00114 } 00115 } 00116 return $this->ip; 00117 } 00118 00123 protected function markDown() { 00124 $this->close(); 00125 $this->socket = false; 00126 } 00127 00131 public function close() { 00132 if ( $this->socket ) { 00133 wfSuppressWarnings(); 00134 socket_set_block( $this->socket ); 00135 socket_shutdown( $this->socket ); 00136 socket_close( $this->socket ); 00137 wfRestoreWarnings(); 00138 } 00139 $this->socket = null; 00140 $this->readBuffer = ''; 00141 // Write buffer is kept since it may contain a request for the next socket 00142 } 00143 00149 public function queuePurge( $url ) { 00150 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) ); 00151 $this->requests[] = "PURGE $url HTTP/1.0\r\n" . 00152 "Connection: Keep-Alive\r\n" . 00153 "Proxy-Connection: Keep-Alive\r\n" . 00154 "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n"; 00155 if ( $this->currentRequestIndex === null ) { 00156 $this->nextRequest(); 00157 } 00158 } 00159 00163 public function isIdle() { 00164 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle'; 00165 } 00166 00170 public function doWrites() { 00171 if ( !strlen( $this->writeBuffer ) ) { 00172 return; 00173 } 00174 $socket = $this->getSocket(); 00175 if ( !$socket ) { 00176 return; 00177 } 00178 00179 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) { 00180 $buf = $this->writeBuffer; 00181 $flags = MSG_EOR; 00182 } else { 00183 $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE ); 00184 $flags = 0; 00185 } 00186 wfSuppressWarnings(); 00187 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags ); 00188 wfRestoreWarnings(); 00189 00190 if ( $bytesSent === false ) { 00191 $error = socket_last_error( $socket ); 00192 if ( $error != self::EAGAIN && $error != self::EINTR ) { 00193 $this->log( 'write error: ' . socket_strerror( $error ) ); 00194 $this->markDown(); 00195 } 00196 return; 00197 } 00198 00199 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent ); 00200 } 00201 00205 public function doReads() { 00206 $socket = $this->getSocket(); 00207 if ( !$socket ) { 00208 return; 00209 } 00210 00211 $buf = ''; 00212 wfSuppressWarnings(); 00213 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 ); 00214 wfRestoreWarnings(); 00215 if ( $bytesRead === false ) { 00216 $error = socket_last_error( $socket ); 00217 if ( $error != self::EAGAIN && $error != self::EINTR ) { 00218 $this->log( 'read error: ' . socket_strerror( $error ) ); 00219 $this->markDown(); 00220 return; 00221 } 00222 } elseif ( $bytesRead === 0 ) { 00223 // Assume EOF 00224 $this->close(); 00225 return; 00226 } 00227 00228 $this->readBuffer .= $buf; 00229 while ( $this->socket && $this->processReadBuffer() === 'continue' ); 00230 } 00231 00236 protected function processReadBuffer() { 00237 switch ( $this->readState ) { 00238 case 'idle': 00239 return 'done'; 00240 case 'status': 00241 case 'header': 00242 $lines = explode( "\r\n", $this->readBuffer, 2 ); 00243 if ( count( $lines ) < 2 ) { 00244 return 'done'; 00245 } 00246 if ( $this->readState == 'status' ) { 00247 $this->processStatusLine( $lines[0] ); 00248 } else { // header 00249 $this->processHeaderLine( $lines[0] ); 00250 } 00251 $this->readBuffer = $lines[1]; 00252 return 'continue'; 00253 case 'body': 00254 if ( $this->bodyRemaining !== null ) { 00255 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) { 00256 $this->bodyRemaining -= strlen( $this->readBuffer ); 00257 $this->readBuffer = ''; 00258 return 'done'; 00259 } else { 00260 $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining ); 00261 $this->bodyRemaining = 0; 00262 $this->nextRequest(); 00263 return 'continue'; 00264 } 00265 } else { 00266 // No content length, read all data to EOF 00267 $this->readBuffer = ''; 00268 return 'done'; 00269 } 00270 default: 00271 throw new MWException( __METHOD__.': unexpected state' ); 00272 } 00273 } 00274 00279 protected function processStatusLine( $line ) { 00280 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) { 00281 $this->log( 'invalid status line' ); 00282 $this->markDown(); 00283 return; 00284 } 00285 list( , , , $status, $reason ) = $m; 00286 $status = intval( $status ); 00287 if ( $status !== 200 && $status !== 404 ) { 00288 $this->log( "unexpected status code: $status $reason" ); 00289 $this->markDown(); 00290 return; 00291 } 00292 $this->readState = 'header'; 00293 } 00294 00298 protected function processHeaderLine( $line ) { 00299 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) { 00300 $this->bodyRemaining = intval( $m[1] ); 00301 } elseif ( $line === '' ) { 00302 $this->readState = 'body'; 00303 } 00304 } 00305 00306 protected function nextRequest() { 00307 if ( $this->currentRequestIndex !== null ) { 00308 unset( $this->requests[$this->currentRequestIndex] ); 00309 } 00310 if ( count( $this->requests ) ) { 00311 $this->readState = 'status'; 00312 $this->currentRequestIndex = key( $this->requests ); 00313 $this->writeBuffer = $this->requests[$this->currentRequestIndex]; 00314 } else { 00315 $this->readState = 'idle'; 00316 $this->currentRequestIndex = null; 00317 $this->writeBuffer = ''; 00318 } 00319 $this->bodyRemaining = null; 00320 } 00321 00322 protected function log( $msg ) { 00323 wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" ); 00324 } 00325 } 00326 00327 class SquidPurgeClientPool { 00328 00332 var $clients = array(); 00333 var $timeout = 5; 00334 00335 function __construct( $options = array() ) { 00336 if ( isset( $options['timeout'] ) ) { 00337 $this->timeout = $options['timeout']; 00338 } 00339 } 00340 00345 public function addClient( $client ) { 00346 $this->clients[] = $client; 00347 } 00348 00349 public function run() { 00350 $done = false; 00351 $startTime = microtime( true ); 00352 while ( !$done ) { 00353 $readSockets = $writeSockets = array(); 00354 foreach ( $this->clients as $clientIndex => $client ) { 00355 $sockets = $client->getReadSocketsForSelect(); 00356 foreach ( $sockets as $i => $socket ) { 00357 $readSockets["$clientIndex/$i"] = $socket; 00358 } 00359 $sockets = $client->getWriteSocketsForSelect(); 00360 foreach ( $sockets as $i => $socket ) { 00361 $writeSockets["$clientIndex/$i"] = $socket; 00362 } 00363 } 00364 if ( !count( $readSockets ) && !count( $writeSockets ) ) { 00365 break; 00366 } 00367 $exceptSockets = null; 00368 $timeout = min( $startTime + $this->timeout - microtime( true ), 1 ); 00369 wfSuppressWarnings(); 00370 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout ); 00371 wfRestoreWarnings(); 00372 if ( $numReady === false ) { 00373 wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' . 00374 socket_strerror( socket_last_error() ) . "\n" ); 00375 break; 00376 } 00377 // Check for timeout, use 1% tolerance since we aimed at having socket_select() 00378 // exit at precisely the overall timeout 00379 if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) { 00380 wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" ); 00381 break; 00382 } elseif ( !$numReady ) { 00383 continue; 00384 } 00385 00386 foreach ( $readSockets as $key => $socket ) { 00387 list( $clientIndex, ) = explode( '/', $key ); 00388 $client = $this->clients[$clientIndex]; 00389 $client->doReads(); 00390 } 00391 foreach ( $writeSockets as $key => $socket ) { 00392 list( $clientIndex, ) = explode( '/', $key ); 00393 $client = $this->clients[$clientIndex]; 00394 $client->doWrites(); 00395 } 00396 00397 $done = true; 00398 foreach ( $this->clients as $client ) { 00399 if ( !$client->isIdle() ) { 00400 $done = false; 00401 } 00402 } 00403 } 00404 foreach ( $this->clients as $client ) { 00405 $client->close(); 00406 } 00407 } 00408 }