3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; either version 2 of the License, or
6 * (at your option) any later version.
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 * http://www.gnu.org/copyleft/gpl.html
20 * @author Aaron Schulz
23 use Psr\Log\LoggerAwareInterface
;
24 use Psr\Log\LoggerInterface
;
25 use Psr\Log\NullLogger
;
26 use Wikimedia\ScopedCallback
;
29 * Class for scanning through chronological, log-structured data or change logs
30 * and locally purging cache keys related to entities that appear in this data.
32 * This is useful for repairing cache when purges are missed by using a reliable
33 * stream, such as Kafka or a replicated MySQL table. Purge loss between datacenters
34 * is expected to be more common than within them.
38 class WANObjectCacheReaper
implements LoggerAwareInterface
{
39 /** @var WANObjectCache */
44 protected $logChunkCallback;
46 protected $keyListCallback;
47 /** @var LoggerInterface */
53 protected $initialStartWindow;
56 * @param WANObjectCache $cache Cache to reap bad keys from
57 * @param BagOStuff $store Cache to store positions use for locking
58 * @param callable $logCallback Callback taking arguments:
59 * - The starting position as a UNIX timestamp
60 * - The starting unique ID used for breaking timestamp collisions or null
61 * - The ending position as a UNIX timestamp
62 * - The maximum number of results to return
63 * It returns a list of maps of (key: cache key, pos: UNIX timestamp, id: unique ID)
64 * for each key affected, with the corrosponding event timestamp/ID information.
65 * The events should be in ascending order, by (timestamp,id).
66 * @param callable $keyCallback Callback taking arguments:
67 * - The WANObjectCache instance
68 * - An object from the event log
69 * It should return a list of WAN cache keys.
70 * The callback must fully duck-type test the object, since can be any model class.
71 * @param array $params Additional options:
72 * - channel: the name of the update event stream.
73 * Default: WANObjectCache::DEFAULT_PURGE_CHANNEL.
74 * - initialStartWindow: seconds back in time to start if the position is lost.
76 * - logger: an SPL monolog instance [optional]
78 public function __construct(
79 WANObjectCache
$cache,
81 callable
$logCallback,
82 callable
$keyCallback,
85 $this->cache
= $cache;
86 $this->store
= $store;
88 $this->logChunkCallback
= $logCallback;
89 $this->keyListCallback
= $keyCallback;
90 if ( isset( $params['channel'] ) ) {
91 $this->channel
= $params['channel'];
93 throw new UnexpectedValueException( "No channel specified." );
96 $this->initialStartWindow
= isset( $params['initialStartWindow'] )
97 ?
$params['initialStartWindow']
99 $this->logger
= isset( $params['logger'] )
104 public function setLogger( LoggerInterface
$logger ) {
105 $this->logger
= $logger;
109 * Check and reap stale keys based on a chunk of events
111 * @param int $n Number of events
112 * @return int Number of keys checked
114 final public function invoke( $n = 100 ) {
115 $posKey = $this->store
->makeGlobalKey( 'WANCache', 'reaper', $this->channel
);
116 $scopeLock = $this->store
->getScopedLock( "$posKey:busy", 0 );
122 $status = $this->store
->get( $posKey );
124 $status = [ 'pos' => $now - $this->initialStartWindow
, 'id' => null ];
127 // Get events for entities who's keys tombstones/hold-off should have expired by now
128 $events = call_user_func_array(
129 $this->logChunkCallback
,
130 [ $status['pos'], $status['id'], $now - WANObjectCache
::HOLDOFF_TTL
- 1, $n ]
135 foreach ( $events as $event ) {
136 $keys = call_user_func_array(
137 $this->keyListCallback
,
138 [ $this->cache
, $event['item'] ]
140 foreach ( $keys as $key ) {
141 unset( $keyEvents[$key] ); // use only the latest per key
143 'pos' => $event['pos'],
151 foreach ( $keyEvents as $key => $keyEvent ) {
152 if ( !$this->cache
->reap( $key, $keyEvent['pos'] ) ) {
156 $lastOkEvent = $event;
159 if ( $lastOkEvent ) {
160 $ok = $this->store
->merge(
162 function ( $bag, $key, $curValue ) use ( $lastOkEvent ) {
166 $curCoord = [ $curValue['pos'], $curValue['id'] ];
167 $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ];
168 if ( $newCoord < $curCoord ) {
169 // Keep prior position instead of rolling it back
175 'pos' => $lastOkEvent['pos'],
176 'id' => $lastOkEvent['id'],
177 'ctime' => $curValue ?
$curValue['ctime'] : date( 'c' )
180 IExpiringStore
::TTL_INDEFINITE
183 $pos = $lastOkEvent['pos'];
184 $id = $lastOkEvent['id'];
186 $this->logger
->info( "Updated cache reap position ($pos, $id)." );
188 $this->logger
->error( "Could not update cache reap position ($pos, $id)." );
192 ScopedCallback
::consume( $scopeLock );
198 * @return array|bool Returns (pos, id) map or false if not set
200 public function getState() {
201 $posKey = $this->store
->makeGlobalKey( 'WANCache', 'reaper', $this->channel
);
203 return $this->store
->get( $posKey );