[ Index ] |
PHP Cross Reference of MediaWiki-1.24.0 |
[Summary view] [Print] [Text view]
1 <?php 2 /** 3 * Squid and Varnish cache purging. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 */ 22 23 /** 24 * An HTTP 1.0 client built for the purposes of purging Squid and Varnish. 25 * Uses asynchronous I/O, allowing purges to be done in a highly parallel 26 * manner. 27 * 28 * Could be replaced by curl_multi_exec() or some such. 29 */ 30 class SquidPurgeClient { 31 /** @var string */ 32 protected $host; 33 34 /** @var int */ 35 protected $port; 36 37 /** @var string|bool */ 38 protected $ip; 39 40 /** @var string */ 41 protected $readState = 'idle'; 42 43 /** @var string */ 44 protected $writeBuffer = ''; 45 46 /** @var array */ 47 protected $requests = array(); 48 49 /** @var mixed */ 50 protected $currentRequestIndex; 51 52 const EINTR = 4; 53 const EAGAIN = 11; 54 const EINPROGRESS = 115; 55 const BUFFER_SIZE = 8192; 56 57 /** 58 * @var resource|null The socket resource, or null for unconnected, or false 59 * for disabled due to error. 60 */ 61 protected $socket; 62 63 /** @var string */ 64 protected $readBuffer; 65 66 /** @var int */ 67 protected $bodyRemaining; 68 69 /** 70 * @param string $server 71 * @param array $options 72 */ 73 public function __construct( $server, $options = array() ) { 74 $parts = explode( ':', $server, 2 ); 75 $this->host = $parts[0]; 76 $this->port = isset( $parts[1] ) ? $parts[1] : 80; 77 } 78 79 /** 80 * Open a socket if there isn't one open already, return it. 81 * Returns false on error. 82 * 83 * @return bool|resource 84 */ 85 protected function getSocket() { 86 if ( $this->socket !== null ) { 87 return $this->socket; 88 } 89 90 $ip = $this->getIP(); 91 if ( !$ip ) { 92 $this->log( "DNS error" ); 93 $this->markDown(); 94 return false; 95 } 96 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); 97 socket_set_nonblock( $this->socket ); 98 wfSuppressWarnings(); 99 $ok = socket_connect( $this->socket, $ip, $this->port ); 100 wfRestoreWarnings(); 101 if ( !$ok ) { 102 $error = socket_last_error( $this->socket ); 103 if ( $error !== self::EINPROGRESS ) { 104 $this->log( "connection error: " . socket_strerror( $error ) ); 105 $this->markDown(); 106 return false; 107 } 108 } 109 110 return $this->socket; 111 } 112 113 /** 114 * Get read socket array for select() 115 * @return array 116 */ 117 public function getReadSocketsForSelect() { 118 if ( $this->readState == 'idle' ) { 119 return array(); 120 } 121 $socket = $this->getSocket(); 122 if ( $socket === false ) { 123 return array(); 124 } 125 return array( $socket ); 126 } 127 128 /** 129 * Get write socket array for select() 130 * @return array 131 */ 132 public function getWriteSocketsForSelect() { 133 if ( !strlen( $this->writeBuffer ) ) { 134 return array(); 135 } 136 $socket = $this->getSocket(); 137 if ( $socket === false ) { 138 return array(); 139 } 140 return array( $socket ); 141 } 142 143 /** 144 * Get the host's IP address. 145 * Does not support IPv6 at present due to the lack of a convenient interface in PHP. 146 * @throws MWException 147 * @return string 148 */ 149 protected function getIP() { 150 if ( $this->ip === null ) { 151 if ( IP::isIPv4( $this->host ) ) { 152 $this->ip = $this->host; 153 } elseif ( IP::isIPv6( $this->host ) ) { 154 throw new MWException( '$wgSquidServers does not support IPv6' ); 155 } else { 156 wfSuppressWarnings(); 157 $this->ip = gethostbyname( $this->host ); 158 if ( $this->ip === $this->host ) { 159 $this->ip = false; 160 } 161 wfRestoreWarnings(); 162 } 163 } 164 return $this->ip; 165 } 166 167 /** 168 * Close the socket and ignore any future purge requests. 169 * This is called if there is a protocol error. 170 */ 171 protected function markDown() { 172 $this->close(); 173 $this->socket = false; 174 } 175 176 /** 177 * Close the socket but allow it to be reopened for future purge requests 178 */ 179 public function close() { 180 if ( $this->socket ) { 181 wfSuppressWarnings(); 182 socket_set_block( $this->socket ); 183 socket_shutdown( $this->socket ); 184 socket_close( $this->socket ); 185 wfRestoreWarnings(); 186 } 187 $this->socket = null; 188 $this->readBuffer = ''; 189 // Write buffer is kept since it may contain a request for the next socket 190 } 191 192 /** 193 * Queue a purge operation 194 * 195 * @param string $url 196 */ 197 public function queuePurge( $url ) { 198 global $wgSquidPurgeUseHostHeader; 199 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) ); 200 $request = array(); 201 if ( $wgSquidPurgeUseHostHeader ) { 202 $url = wfParseUrl( $url ); 203 $host = $url['host']; 204 if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) { 205 $host .= ":" . $url['port']; 206 } 207 $path = $url['path']; 208 if ( isset( $url['query'] ) && is_string( $url['query'] ) ) { 209 $path = wfAppendQuery( $path, $url['query'] ); 210 } 211 $request[] = "PURGE $path HTTP/1.1"; 212 $request[] = "Host: $host"; 213 } else { 214 $request[] = "PURGE $url HTTP/1.0"; 215 } 216 $request[] = "Connection: Keep-Alive"; 217 $request[] = "Proxy-Connection: Keep-Alive"; 218 $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__; 219 // Two ''s to create \r\n\r\n 220 $request[] = ''; 221 $request[] = ''; 222 223 $this->requests[] = implode( "\r\n", $request ); 224 if ( $this->currentRequestIndex === null ) { 225 $this->nextRequest(); 226 } 227 } 228 229 /** 230 * @return bool 231 */ 232 public function isIdle() { 233 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle'; 234 } 235 236 /** 237 * Perform pending writes. Call this when socket_select() indicates that writing will not block. 238 */ 239 public function doWrites() { 240 if ( !strlen( $this->writeBuffer ) ) { 241 return; 242 } 243 $socket = $this->getSocket(); 244 if ( !$socket ) { 245 return; 246 } 247 248 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) { 249 $buf = $this->writeBuffer; 250 $flags = MSG_EOR; 251 } else { 252 $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE ); 253 $flags = 0; 254 } 255 wfSuppressWarnings(); 256 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags ); 257 wfRestoreWarnings(); 258 259 if ( $bytesSent === false ) { 260 $error = socket_last_error( $socket ); 261 if ( $error != self::EAGAIN && $error != self::EINTR ) { 262 $this->log( 'write error: ' . socket_strerror( $error ) ); 263 $this->markDown(); 264 } 265 return; 266 } 267 268 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent ); 269 } 270 271 /** 272 * Read some data. Call this when socket_select() indicates that the read buffer is non-empty. 273 */ 274 public function doReads() { 275 $socket = $this->getSocket(); 276 if ( !$socket ) { 277 return; 278 } 279 280 $buf = ''; 281 wfSuppressWarnings(); 282 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 ); 283 wfRestoreWarnings(); 284 if ( $bytesRead === false ) { 285 $error = socket_last_error( $socket ); 286 if ( $error != self::EAGAIN && $error != self::EINTR ) { 287 $this->log( 'read error: ' . socket_strerror( $error ) ); 288 $this->markDown(); 289 return; 290 } 291 } elseif ( $bytesRead === 0 ) { 292 // Assume EOF 293 $this->close(); 294 return; 295 } 296 297 $this->readBuffer .= $buf; 298 while ( $this->socket && $this->processReadBuffer() === 'continue' ); 299 } 300 301 /** 302 * @throws MWException 303 * @return string 304 */ 305 protected function processReadBuffer() { 306 switch ( $this->readState ) { 307 case 'idle': 308 return 'done'; 309 case 'status': 310 case 'header': 311 $lines = explode( "\r\n", $this->readBuffer, 2 ); 312 if ( count( $lines ) < 2 ) { 313 return 'done'; 314 } 315 if ( $this->readState == 'status' ) { 316 $this->processStatusLine( $lines[0] ); 317 } else { // header 318 $this->processHeaderLine( $lines[0] ); 319 } 320 $this->readBuffer = $lines[1]; 321 return 'continue'; 322 case 'body': 323 if ( $this->bodyRemaining !== null ) { 324 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) { 325 $this->bodyRemaining -= strlen( $this->readBuffer ); 326 $this->readBuffer = ''; 327 return 'done'; 328 } else { 329 $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining ); 330 $this->bodyRemaining = 0; 331 $this->nextRequest(); 332 return 'continue'; 333 } 334 } else { 335 // No content length, read all data to EOF 336 $this->readBuffer = ''; 337 return 'done'; 338 } 339 default: 340 throw new MWException( __METHOD__ . ': unexpected state' ); 341 } 342 } 343 344 /** 345 * @param string $line 346 */ 347 protected function processStatusLine( $line ) { 348 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) { 349 $this->log( 'invalid status line' ); 350 $this->markDown(); 351 return; 352 } 353 list( , , , $status, $reason ) = $m; 354 $status = intval( $status ); 355 if ( $status !== 200 && $status !== 404 ) { 356 $this->log( "unexpected status code: $status $reason" ); 357 $this->markDown(); 358 return; 359 } 360 $this->readState = 'header'; 361 } 362 363 /** 364 * @param string $line 365 */ 366 protected function processHeaderLine( $line ) { 367 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) { 368 $this->bodyRemaining = intval( $m[1] ); 369 } elseif ( $line === '' ) { 370 $this->readState = 'body'; 371 } 372 } 373 374 protected function nextRequest() { 375 if ( $this->currentRequestIndex !== null ) { 376 unset( $this->requests[$this->currentRequestIndex] ); 377 } 378 if ( count( $this->requests ) ) { 379 $this->readState = 'status'; 380 $this->currentRequestIndex = key( $this->requests ); 381 $this->writeBuffer = $this->requests[$this->currentRequestIndex]; 382 } else { 383 $this->readState = 'idle'; 384 $this->currentRequestIndex = null; 385 $this->writeBuffer = ''; 386 } 387 $this->bodyRemaining = null; 388 } 389 390 /** 391 * @param string $msg 392 */ 393 protected function log( $msg ) { 394 wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg" ); 395 } 396 } 397 398 class SquidPurgeClientPool { 399 /** @var array Array of SquidPurgeClient */ 400 protected $clients = array(); 401 402 /** @var int */ 403 protected $timeout = 5; 404 405 /** 406 * @param array $options 407 */ 408 function __construct( $options = array() ) { 409 if ( isset( $options['timeout'] ) ) { 410 $this->timeout = $options['timeout']; 411 } 412 } 413 414 /** 415 * @param SquidPurgeClient $client 416 * @return void 417 */ 418 public function addClient( $client ) { 419 $this->clients[] = $client; 420 } 421 422 public function run() { 423 $done = false; 424 $startTime = microtime( true ); 425 while ( !$done ) { 426 $readSockets = $writeSockets = array(); 427 /** 428 * @var $client SquidPurgeClient 429 */ 430 foreach ( $this->clients as $clientIndex => $client ) { 431 $sockets = $client->getReadSocketsForSelect(); 432 foreach ( $sockets as $i => $socket ) { 433 $readSockets["$clientIndex/$i"] = $socket; 434 } 435 $sockets = $client->getWriteSocketsForSelect(); 436 foreach ( $sockets as $i => $socket ) { 437 $writeSockets["$clientIndex/$i"] = $socket; 438 } 439 } 440 if ( !count( $readSockets ) && !count( $writeSockets ) ) { 441 break; 442 } 443 $exceptSockets = null; 444 $timeout = min( $startTime + $this->timeout - microtime( true ), 1 ); 445 wfSuppressWarnings(); 446 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout ); 447 wfRestoreWarnings(); 448 if ( $numReady === false ) { 449 wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' . 450 socket_strerror( socket_last_error() ) . "\n" ); 451 break; 452 } 453 // Check for timeout, use 1% tolerance since we aimed at having socket_select() 454 // exit at precisely the overall timeout 455 if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) { 456 wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" ); 457 break; 458 } elseif ( !$numReady ) { 459 continue; 460 } 461 462 foreach ( $readSockets as $key => $socket ) { 463 list( $clientIndex, ) = explode( '/', $key ); 464 $client = $this->clients[$clientIndex]; 465 $client->doReads(); 466 } 467 foreach ( $writeSockets as $key => $socket ) { 468 list( $clientIndex, ) = explode( '/', $key ); 469 $client = $this->clients[$clientIndex]; 470 $client->doWrites(); 471 } 472 473 $done = true; 474 foreach ( $this->clients as $client ) { 475 if ( !$client->isIdle() ) { 476 $done = false; 477 } 478 } 479 } 480 foreach ( $this->clients as $client ) { 481 $client->close(); 482 } 483 } 484 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Fri Nov 28 14:03:12 2014 | Cross-referenced by PHPXref 0.7.1 |