* partly fixes two issues exposed by bug 34280
[mediawiki.git] / maintenance / locking / LockServerDaemon.php
blob50b939cd11cb9edf87232f7f177579ff6c04c04b
1 <?php
2 /**
3 * @file
4 * @ingroup LockManager Maintenance
5 */
7 /**
8 * This code should not require MediaWiki setup or PHP files.
9 */
10 if ( php_sapi_name() !== 'cli' ) {
11 die( "This is not a valid entry point.\n" );
13 error_reporting( E_ALL );
15 // Run the server...
16 set_time_limit( 0 );
17 LockServerDaemon::init(
18 getopt( '', array(
19 'address:', 'port:', 'authKey:',
20 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::',
21 ) )
22 )->main();
24 /**
25 * Simple lock server daemon that accepts lock/unlock requests
27 class LockServerDaemon {
28 /** @var resource */
29 protected $sock; // socket to listen/accept on
30 /** @var Array */
31 protected $sessions = array(); // (session => resource)
32 /** @var Array */
33 protected $deadSessions = array(); // (session => UNIX timestamp)
35 /** @var LockHolder */
36 protected $lockHolder;
38 protected $address; // string IP address
39 protected $port; // integer
40 protected $authKey; // string key
41 protected $lockTimeout; // integer number of seconds
42 protected $maxBacklog; // integer
43 protected $maxClients; // integer
45 protected $startTime; // integer UNIX timestamp
46 protected $ticks = 0; // integer counter
48 /* @var LockServerDaemon */
49 protected static $instance = null;
51 /**
52 * @params $config Array
53 * @return LockServerDaemon
55 public static function init( array $config ) {
56 if ( self::$instance ) {
57 throw new Exception( 'LockServer already initialized.' );
59 foreach ( array( 'address', 'port', 'authKey' ) as $par ) {
60 if ( !isset( $config[$par] ) ) {
61 die( "Usage: php LockServerDaemon.php " .
62 "--address <address> --port <port> --authkey <key> " .
63 "[--lockTimeout <seconds>] " .
64 "[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]"
68 self::$instance = new self( $config );
69 return self::$instance;
72 /**
73 * @params $config Array
75 protected function __construct( array $config ) {
76 // Required parameters...
77 $this->address = $config['address'];
78 $this->port = $config['port'];
79 $this->authKey = $config['authKey'];
80 // Parameters with defaults...
81 $this->lockTimeout = isset( $config['lockTimeout'] )
82 ? (int)$config['lockTimeout']
83 : 60;
84 $this->maxClients = isset( $config['maxClients'] )
85 ? (int)$config['maxClients']
86 : 1000; // less than default FD_SETSIZE
87 $this->maxBacklog = isset( $config['maxBacklog'] )
88 ? (int)$config['maxBacklog']
89 : 100;
90 $maxLocks = isset( $config['maxLocks'] )
91 ? (int)$config['maxLocks']
92 : 10000;
94 $this->lockHolder = new LockHolder( $maxLocks );
97 /**
98 * @return void
100 protected function setupServerSocket() {
101 if ( !function_exists( 'socket_create' ) ) {
102 throw new Exception( "PHP sockets extension missing from PHP CLI mode." );
104 $sock = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
105 if ( $sock === false ) {
106 throw new Exception( "socket_create(): " . socket_strerror( socket_last_error() ) );
108 socket_set_option( $sock, SOL_SOCKET, SO_REUSEADDR, 1 ); // bypass 2MLS
109 socket_set_nonblock( $sock ); // don't block on accept()
110 if ( socket_bind( $sock, $this->address, $this->port ) === false ) {
111 throw new Exception( "socket_bind(): " .
112 socket_strerror( socket_last_error( $sock ) ) );
113 } elseif ( socket_listen( $sock, $this->maxBacklog ) === false ) {
114 throw new Exception( "socket_listen(): " .
115 socket_strerror( socket_last_error( $sock ) ) );
117 $this->sock = $sock;
118 $this->startTime = time();
122 * Entry-point function that listens to the server socket, accepts
123 * new clients, and recieves/responds to requests to lock resources.
125 public function main() {
126 $this->setupServerSocket(); // setup listening socket
127 $socketArray = new SocketArray(); // sockets being serviced
128 $socketArray->addSocket( $this->sock ); // add listening socket
129 do {
130 list( $read, $write ) = $socketArray->socketsForSelect();
131 if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) {
132 continue; // wait
134 // Check if there is a client trying to connect...
135 if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) {
136 $newSock = socket_accept( $this->sock );
137 if ( $newSock ) {
138 socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 );
139 socket_set_nonblock( $newSock ); // don't block on read()/write()
140 $socketArray->addSocket( $newSock );
143 // Loop through all the clients that have data to read...
144 foreach ( $read as $read_sock ) {
145 if ( $read_sock === $this->sock ) {
146 continue; // skip listening socket
148 // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471
149 $data = socket_read( $read_sock, 65535 );
150 // Check if the client is disconnected
151 if ( $data === false || $data === '' ) {
152 $socketArray->closeSocket( $read_sock );
153 $this->recordDeadSocket( $read_sock ); // remove session
154 // Check if we reached the end of a message
155 } elseif ( substr( $data, -1 ) === "\n" ) {
156 // Newline is the last char (given ping-pong message usage)
157 $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data;
158 // Perform the requested command...
159 $response = $this->doCommand( rtrim( $cmd ), $read_sock );
160 // Send the response to the client...
161 $socketArray->appendSndBuffer( $read_sock, $response . "\n" );
162 // Otherwise, we just have more message data to append
163 } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) {
164 $socketArray->closeSocket( $read_sock ); // too big
165 $this->recordDeadSocket( $read_sock ); // remove session
168 // Loop through all the clients that have data to write...
169 foreach ( $write as $write_sock ) {
170 $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) );
171 // Check if the client is disconnected
172 if ( $bytes === false ) {
173 $socketArray->closeSocket( $write_sock );
174 $this->recordDeadSocket( $write_sock ); // remove session
175 // Otherwise, truncate these bytes from the start of the write buffer
176 } else {
177 $socketArray->consumeSndBuffer( $write_sock, $bytes );
180 // Prune dead locks every few socket events...
181 if ( ++$this->ticks >= 9 ) {
182 $this->ticks = 0;
183 $this->purgeExpiredLocks();
185 } while ( true );
189 * @param $data string
190 * @param $sourceSock resource
191 * @return string
193 protected function doCommand( $data, $sourceSock ) {
194 $cmdArr = $this->getCommand( $data );
195 if ( is_string( $cmdArr ) ) {
196 return $cmdArr; // error
198 list( $function, $session, $type, $resources ) = $cmdArr;
199 // On first command, track the session => sock correspondence
200 if ( !isset( $this->sessions[$session] ) ) {
201 $this->sessions[$session] = $sourceSock;
202 unset( $this->deadSessions[$session] ); // renew if dead
204 if ( $function === 'ACQUIRE' ) {
205 return $this->lockHolder->lock( $session, $type, $resources );
206 } elseif ( $function === 'RELEASE' ) {
207 return $this->lockHolder->unlock( $session, $type, $resources );
208 } elseif ( $function === 'RELEASE_ALL' ) {
209 return $this->lockHolder->release( $session );
210 } elseif ( $function === 'STAT' ) {
211 return $this->stat();
213 return 'INTERNAL_ERROR';
217 * @param $data string
218 * @return Array
220 protected function getCommand( $data ) {
221 $m = explode( ':', $data ); // <session, key, command, type, values>
222 if ( count( $m ) == 5 ) {
223 list( $session, $key, $command, $type, $values ) = $m;
224 if ( sha1( $session . $command . $type . $values . $this->authKey ) !== $key ) {
225 return 'BAD_KEY';
226 } elseif ( strlen( $session ) !== 31 ) {
227 return 'BAD_SESSION';
229 $values = explode( '|', $values );
230 if ( $command === 'ACQUIRE' ) {
231 $needsLockArgs = true;
232 } elseif ( $command === 'RELEASE' ) {
233 $needsLockArgs = true;
234 } elseif ( $command === 'RELEASE_ALL' ) {
235 $needsLockArgs = false;
236 } elseif ( $command === 'STAT' ) {
237 $needsLockArgs = false;
238 } else {
239 return 'BAD_COMMAND';
241 if ( $needsLockArgs ) {
242 if ( $type !== 'SH' && $type !== 'EX' ) {
243 return 'BAD_TYPE';
245 foreach ( $values as $value ) {
246 if ( strlen( $value ) !== 31 ) {
247 return 'BAD_FORMAT';
251 return array( $command, $session, $type, $values );
253 return 'BAD_FORMAT';
257 * Remove a socket's corresponding session from tracking and
258 * store it in the dead session tracking if it still has locks.
260 * @param $socket resource
261 * @return bool
263 protected function recordDeadSocket( $socket ) {
264 $session = array_search( $socket, $this->sessions );
265 if ( $session !== false ) {
266 unset( $this->sessions[$session] );
267 // Record recently killed sessions that still have locks
268 if ( $this->lockHolder->sessionHasLocks( $session ) ) {
269 $this->deadSessions[$session] = time();
271 return true;
273 return false;
277 * Clear locks for sessions that have been dead for a while
279 * @return integer Number of sessions purged
281 protected function purgeExpiredLocks() {
282 $count = 0;
283 $now = time();
284 foreach ( $this->deadSessions as $session => $timestamp ) {
285 if ( ( $now - $timestamp ) > $this->lockTimeout ) {
286 $this->lockHolder->release( $session );
287 unset( $this->deadSessions[$session] );
288 ++$count;
291 return $count;
295 * Get the current timestamp and memory usage
297 * @return string
299 protected function stat() {
300 return ( time() - $this->startTime ) . ':' . memory_get_usage();
305 * LockServerDaemon helper class that keeps track socket states
307 class SocketArray {
308 /* @var Array */
309 protected $clients = array(); // array of client sockets
310 /* @var Array */
311 protected $rBuffers = array(); // corresponding socket read buffers
312 /* @var Array */
313 protected $wBuffers = array(); // corresponding socket write buffers
315 const BUFFER_SIZE = 65535;
318 * @return Array (list of sockets to read, list of sockets to write)
320 public function socketsForSelect() {
321 $rSockets = array();
322 $wSockets = array();
323 foreach ( $this->clients as $key => $socket ) {
324 if ( $this->wBuffers[$key] !== '' ) {
325 $wSockets[] = $socket; // wait for writing to unblock
326 } else {
327 $rSockets[] = $socket; // wait for reading to unblock
330 return array( $rSockets, $wSockets );
334 * @return integer Number of client sockets
336 public function size() {
337 return count( $this->clients );
341 * @param $sock resource
342 * @return bool
344 public function addSocket( $sock ) {
345 $this->clients[] = $sock;
346 $this->rBuffers[] = '';
347 $this->wBuffers[] = '';
348 return true;
352 * @param $sock resource
353 * @return bool
355 public function closeSocket( $sock ) {
356 $key = array_search( $sock, $this->clients );
357 if ( $key === false ) {
358 return false;
360 socket_close( $sock );
361 unset( $this->clients[$key] );
362 unset( $this->rBuffers[$key] );
363 unset( $this->wBuffers[$key] );
364 return true;
368 * @param $sock resource
369 * @param $data string
370 * @return bool
372 public function appendRcvBuffer( $sock, $data ) {
373 $key = array_search( $sock, $this->clients );
374 if ( $key === false ) {
375 return false;
376 } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
377 return false;
379 $this->rBuffers[$key] .= $data;
380 return true;
384 * @param $sock resource
385 * @return string|bool
387 public function readRcvBuffer( $sock ) {
388 $key = array_search( $sock, $this->clients );
389 if ( $key === false ) {
390 return false;
392 $data = $this->rBuffers[$key];
393 $this->rBuffers[$key] = ''; // consume data
394 return $data;
398 * @param $sock resource
399 * @param $data string
400 * @return bool
402 public function appendSndBuffer( $sock, $data ) {
403 $key = array_search( $sock, $this->clients );
404 if ( $key === false ) {
405 return false;
406 } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
407 return false;
409 $this->wBuffers[$key] .= $data;
410 return true;
414 * @param $sock resource
415 * @return bool
417 public function readSndBuffer( $sock ) {
418 $key = array_search( $sock, $this->clients );
419 if ( $key === false ) {
420 return false;
422 return $this->wBuffers[$key];
426 * @param $sock resource
427 * @param $bytes integer
428 * @return bool
430 public function consumeSndBuffer( $sock, $bytes ) {
431 $key = array_search( $sock, $this->clients );
432 if ( $key === false ) {
433 return false;
435 $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes );
436 return true;
441 * LockServerDaemon helper class that keeps track of the locks
443 class LockHolder {
444 /** @var Array */
445 protected $shLocks = array(); // (key => session => 1)
446 /** @var Array */
447 protected $exLocks = array(); // (key => session)
449 /** @var Array */
450 protected $sessionIndexSh = array(); // (session => key => 1)
451 /** @var Array */
452 protected $sessionIndexEx = array(); // (session => key => 1)
453 protected $lockCount = 0; // integer
455 protected $maxLocks; // integer
458 * @params $maxLocks integer Maximum number of locks to allow
460 public function __construct( $maxLocks ) {
461 $this->maxLocks = $maxLocks;
465 * @param $session string
466 * @return bool
468 public function sessionHasLocks( $session ) {
469 return isset( $this->sessionIndexSh[$session] )
470 || isset( $this->sessionIndexEx[$session] );
474 * @param $session string
475 * @param $type string
476 * @param $keys Array
477 * @return string
479 public function lock( $session, $type, array $keys ) {
480 if ( ( $this->lockCount + count( $keys ) ) > $this->maxLocks ) {
481 return 'TOO_MANY_LOCKS';
483 if ( $type === 'SH' ) {
484 // Check if any keys are already write-locked...
485 foreach ( $keys as $key ) {
486 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
487 return 'CANT_ACQUIRE';
490 // Acquire the read-locks...
491 foreach ( $keys as $key ) {
492 $this->set_sh_lock( $key, $session );
494 return 'ACQUIRED';
495 } elseif ( $type === 'EX' ) {
496 // Check if any keys are already read-locked or write-locked...
497 foreach ( $keys as $key ) {
498 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
499 return 'CANT_ACQUIRE';
501 if ( isset( $this->shLocks[$key] ) ) {
502 foreach ( $this->shLocks[$key] as $otherSession => $x ) {
503 if ( $otherSession !== $session ) {
504 return 'CANT_ACQUIRE';
509 // Acquire the write-locks...
510 foreach ( $keys as $key ) {
511 $this->set_ex_lock( $key, $session );
513 return 'ACQUIRED';
515 return 'INTERNAL_ERROR';
519 * @param $session string
520 * @param $type string
521 * @param $keys Array
522 * @return string
524 public function unlock( $session, $type, array $keys ) {
525 if ( $type === 'SH' ) {
526 foreach ( $keys as $key ) {
527 $this->unset_sh_lock( $key, $session );
529 return 'RELEASED';
530 } elseif ( $type === 'EX' ) {
531 foreach ( $keys as $key ) {
532 $this->unset_ex_lock( $key, $session );
534 return 'RELEASED';
536 return 'INTERNAL_ERROR';
540 * @param $session string
541 * @return string
543 public function release( $session ) {
544 if ( isset( $this->sessionIndexSh[$session] ) ) {
545 foreach ( $this->sessionIndexSh[$session] as $key => $x ) {
546 $this->unset_sh_lock( $key, $session );
549 if ( isset( $this->sessionIndexEx[$session] ) ) {
550 foreach ( $this->sessionIndexEx[$session] as $key => $x ) {
551 $this->unset_ex_lock( $key, $session );
554 return 'RELEASED_ALL';
558 * @param $key string
559 * @param $session string
560 * @return void
562 protected function set_sh_lock( $key, $session ) {
563 if ( !isset( $this->shLocks[$key][$session] ) ) {
564 $this->shLocks[$key][$session] = 1;
565 $this->sessionIndexSh[$session][$key] = 1;
566 ++$this->lockCount; // we are adding a lock
571 * @param $key string
572 * @param $session string
573 * @return void
575 protected function set_ex_lock( $key, $session ) {
576 if ( !isset( $this->exLocks[$key][$session] ) ) {
577 $this->exLocks[$key] = $session;
578 $this->sessionIndexEx[$session][$key] = 1;
579 ++$this->lockCount; // we are adding a lock
584 * @param $key string
585 * @param $session string
586 * @return void
588 protected function unset_sh_lock( $key, $session ) {
589 if ( isset( $this->shLocks[$key][$session] ) ) {
590 unset( $this->shLocks[$key][$session] );
591 if ( !count( $this->shLocks[$key] ) ) {
592 unset( $this->shLocks[$key] );
594 unset( $this->sessionIndexSh[$session][$key] );
595 if ( !count( $this->sessionIndexSh[$session] ) ) {
596 unset( $this->sessionIndexSh[$session] );
598 --$this->lockCount;
603 * @param $key string
604 * @param $session string
605 * @return void
607 protected function unset_ex_lock( $key, $session ) {
608 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] === $session ) {
609 unset( $this->exLocks[$key] );
610 unset( $this->sessionIndexEx[$session][$key] );
611 if ( !count( $this->sessionIndexEx[$session] ) ) {
612 unset( $this->sessionIndexEx[$session] );
614 --$this->lockCount;