Update migrateUserGroup to deal with primary key issue
[mediawiki.git] / includes / SquidPurgeClient.php
blob7cd2b033b0c133c84f352cb2da40d3f29edc6fb5
1 <?php
2 /**
3 * Squid and Varnish cache purging.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
20 * @file
23 /**
24 * An HTTP 1.0 client built for the purposes of purging Squid and Varnish.
25 * Uses asynchronous I/O, allowing purges to be done in a highly parallel
26 * manner.
28 * Could be replaced by curl_multi_exec() or some such.
30 class SquidPurgeClient {
31 var $host, $port, $ip;
33 var $readState = 'idle';
34 var $writeBuffer = '';
35 var $requests = array();
36 var $currentRequestIndex;
38 const EINTR = 4;
39 const EAGAIN = 11;
40 const EINPROGRESS = 115;
41 const BUFFER_SIZE = 8192;
43 /**
44 * The socket resource, or null for unconnected, or false for disabled due to error
46 var $socket;
48 public function __construct( $server, $options = array() ) {
49 $parts = explode( ':', $server, 2 );
50 $this->host = $parts[0];
51 $this->port = isset( $parts[1] ) ? $parts[1] : 80;
54 /**
55 * Open a socket if there isn't one open already, return it.
56 * Returns false on error.
58 * @return bool|resource
60 protected function getSocket() {
61 if ( $this->socket !== null ) {
62 return $this->socket;
65 $ip = $this->getIP();
66 if ( !$ip ) {
67 $this->log( "DNS error" );
68 $this->markDown();
69 return false;
71 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
72 socket_set_nonblock( $this->socket );
73 wfSuppressWarnings();
74 $ok = socket_connect( $this->socket, $ip, $this->port );
75 wfRestoreWarnings();
76 if ( !$ok ) {
77 $error = socket_last_error( $this->socket );
78 if ( $error !== self::EINPROGRESS ) {
79 $this->log( "connection error: " . socket_strerror( $error ) );
80 $this->markDown();
81 return false;
85 return $this->socket;
88 /**
89 * Get read socket array for select()
90 * @return array
92 public function getReadSocketsForSelect() {
93 if ( $this->readState == 'idle' ) {
94 return array();
96 $socket = $this->getSocket();
97 if ( $socket === false ) {
98 return array();
100 return array( $socket );
104 * Get write socket array for select()
105 * @return array
107 public function getWriteSocketsForSelect() {
108 if ( !strlen( $this->writeBuffer ) ) {
109 return array();
111 $socket = $this->getSocket();
112 if ( $socket === false ) {
113 return array();
115 return array( $socket );
118 /**
119 * Get the host's IP address.
120 * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
122 protected function getIP() {
123 if ( $this->ip === null ) {
124 if ( IP::isIPv4( $this->host ) ) {
125 $this->ip = $this->host;
126 } elseif ( IP::isIPv6( $this->host ) ) {
127 throw new MWException( '$wgSquidServers does not support IPv6' );
128 } else {
129 wfSuppressWarnings();
130 $this->ip = gethostbyname( $this->host );
131 if ( $this->ip === $this->host ) {
132 $this->ip = false;
134 wfRestoreWarnings();
137 return $this->ip;
141 * Close the socket and ignore any future purge requests.
142 * This is called if there is a protocol error.
144 protected function markDown() {
145 $this->close();
146 $this->socket = false;
150 * Close the socket but allow it to be reopened for future purge requests
152 public function close() {
153 if ( $this->socket ) {
154 wfSuppressWarnings();
155 socket_set_block( $this->socket );
156 socket_shutdown( $this->socket );
157 socket_close( $this->socket );
158 wfRestoreWarnings();
160 $this->socket = null;
161 $this->readBuffer = '';
162 // Write buffer is kept since it may contain a request for the next socket
166 * Queue a purge operation
168 * @param $url string
170 public function queuePurge( $url ) {
171 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) );
172 $this->requests[] = "PURGE $url HTTP/1.0\r\n" .
173 "Connection: Keep-Alive\r\n" .
174 "Proxy-Connection: Keep-Alive\r\n" .
175 "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n";
176 if ( $this->currentRequestIndex === null ) {
177 $this->nextRequest();
182 * @return bool
184 public function isIdle() {
185 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
189 * Perform pending writes. Call this when socket_select() indicates that writing will not block.
191 public function doWrites() {
192 if ( !strlen( $this->writeBuffer ) ) {
193 return;
195 $socket = $this->getSocket();
196 if ( !$socket ) {
197 return;
200 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
201 $buf = $this->writeBuffer;
202 $flags = MSG_EOR;
203 } else {
204 $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
205 $flags = 0;
207 wfSuppressWarnings();
208 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
209 wfRestoreWarnings();
211 if ( $bytesSent === false ) {
212 $error = socket_last_error( $socket );
213 if ( $error != self::EAGAIN && $error != self::EINTR ) {
214 $this->log( 'write error: ' . socket_strerror( $error ) );
215 $this->markDown();
217 return;
220 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
224 * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
226 public function doReads() {
227 $socket = $this->getSocket();
228 if ( !$socket ) {
229 return;
232 $buf = '';
233 wfSuppressWarnings();
234 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
235 wfRestoreWarnings();
236 if ( $bytesRead === false ) {
237 $error = socket_last_error( $socket );
238 if ( $error != self::EAGAIN && $error != self::EINTR ) {
239 $this->log( 'read error: ' . socket_strerror( $error ) );
240 $this->markDown();
241 return;
243 } elseif ( $bytesRead === 0 ) {
244 // Assume EOF
245 $this->close();
246 return;
249 $this->readBuffer .= $buf;
250 while ( $this->socket && $this->processReadBuffer() === 'continue' );
254 * @throws MWException
255 * @return string
257 protected function processReadBuffer() {
258 switch ( $this->readState ) {
259 case 'idle':
260 return 'done';
261 case 'status':
262 case 'header':
263 $lines = explode( "\r\n", $this->readBuffer, 2 );
264 if ( count( $lines ) < 2 ) {
265 return 'done';
267 if ( $this->readState == 'status' ) {
268 $this->processStatusLine( $lines[0] );
269 } else { // header
270 $this->processHeaderLine( $lines[0] );
272 $this->readBuffer = $lines[1];
273 return 'continue';
274 case 'body':
275 if ( $this->bodyRemaining !== null ) {
276 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
277 $this->bodyRemaining -= strlen( $this->readBuffer );
278 $this->readBuffer = '';
279 return 'done';
280 } else {
281 $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
282 $this->bodyRemaining = 0;
283 $this->nextRequest();
284 return 'continue';
286 } else {
287 // No content length, read all data to EOF
288 $this->readBuffer = '';
289 return 'done';
291 default:
292 throw new MWException( __METHOD__.': unexpected state' );
297 * @param $line
298 * @return
300 protected function processStatusLine( $line ) {
301 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
302 $this->log( 'invalid status line' );
303 $this->markDown();
304 return;
306 list( , , , $status, $reason ) = $m;
307 $status = intval( $status );
308 if ( $status !== 200 && $status !== 404 ) {
309 $this->log( "unexpected status code: $status $reason" );
310 $this->markDown();
311 return;
313 $this->readState = 'header';
317 * @param $line string
319 protected function processHeaderLine( $line ) {
320 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
321 $this->bodyRemaining = intval( $m[1] );
322 } elseif ( $line === '' ) {
323 $this->readState = 'body';
327 protected function nextRequest() {
328 if ( $this->currentRequestIndex !== null ) {
329 unset( $this->requests[$this->currentRequestIndex] );
331 if ( count( $this->requests ) ) {
332 $this->readState = 'status';
333 $this->currentRequestIndex = key( $this->requests );
334 $this->writeBuffer = $this->requests[$this->currentRequestIndex];
335 } else {
336 $this->readState = 'idle';
337 $this->currentRequestIndex = null;
338 $this->writeBuffer = '';
340 $this->bodyRemaining = null;
343 protected function log( $msg ) {
344 wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" );
348 class SquidPurgeClientPool {
351 * @var array of SquidPurgeClient
353 var $clients = array();
354 var $timeout = 5;
356 function __construct( $options = array() ) {
357 if ( isset( $options['timeout'] ) ) {
358 $this->timeout = $options['timeout'];
363 * @param $client SquidPurgeClient
364 * @return void
366 public function addClient( $client ) {
367 $this->clients[] = $client;
370 public function run() {
371 $done = false;
372 $startTime = microtime( true );
373 while ( !$done ) {
374 $readSockets = $writeSockets = array();
375 foreach ( $this->clients as $clientIndex => $client ) {
376 $sockets = $client->getReadSocketsForSelect();
377 foreach ( $sockets as $i => $socket ) {
378 $readSockets["$clientIndex/$i"] = $socket;
380 $sockets = $client->getWriteSocketsForSelect();
381 foreach ( $sockets as $i => $socket ) {
382 $writeSockets["$clientIndex/$i"] = $socket;
385 if ( !count( $readSockets ) && !count( $writeSockets ) ) {
386 break;
388 $exceptSockets = null;
389 $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
390 wfSuppressWarnings();
391 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
392 wfRestoreWarnings();
393 if ( $numReady === false ) {
394 wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' .
395 socket_strerror( socket_last_error() ) . "\n" );
396 break;
398 // Check for timeout, use 1% tolerance since we aimed at having socket_select()
399 // exit at precisely the overall timeout
400 if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
401 wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" );
402 break;
403 } elseif ( !$numReady ) {
404 continue;
407 foreach ( $readSockets as $key => $socket ) {
408 list( $clientIndex, ) = explode( '/', $key );
409 $client = $this->clients[$clientIndex];
410 $client->doReads();
412 foreach ( $writeSockets as $key => $socket ) {
413 list( $clientIndex, ) = explode( '/', $key );
414 $client = $this->clients[$clientIndex];
415 $client->doWrites();
418 $done = true;
419 foreach ( $this->clients as $client ) {
420 if ( !$client->isIdle() ) {
421 $done = false;
425 foreach ( $this->clients as $client ) {
426 $client->close();