3 * Pick a database that has pending jobs
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
20 * @todo Make this work on PostgreSQL and maybe other database servers
21 * @ingroup Maintenance
24 require_once( dirname( __FILE__
) . '/Maintenance.php' );
26 class nextJobDB
extends Maintenance
{
27 public function __construct() {
28 parent
::__construct();
29 $this->mDescription
= "Pick a database that has pending jobs";
30 $this->addOption( 'type', "The type of job to search for", false, true );
33 public function execute() {
35 $type = $this->getOption( 'type', false );
37 $memcKey = 'jobqueue:dbs:v2';
38 $pendingDBs = $wgMemc->get( $memcKey );
40 // If the cache entry wasn't present, or in 1% of cases otherwise,
41 // regenerate the cache.
42 if ( !$pendingDBs ||
mt_rand( 0, 100 ) == 0 ) {
43 $pendingDBs = $this->getPendingDbs();
44 $wgMemc->set( $memcKey, $pendingDBs, 300 );
54 if ( $type === false ) {
55 $candidates = call_user_func_array( 'array_merge', $pendingDBs );
56 } elseif ( isset( $pendingDBs[$type] ) ) {
57 $candidates = $pendingDBs[$type];
59 $candidates = array();
65 $candidates = array_values( $candidates );
66 $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
67 if ( !$this->checkJob( $type, $db ) ) {
68 // This job is not available in the current database. Remove it from
70 if ( $type === false ) {
71 foreach ( $pendingDBs as $type2 => $dbs ) {
72 $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) );
75 $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
78 $wgMemc->set( $memcKey, $pendingDBs, 300 );
83 $this->output( $db . "\n" );
87 * Check if the specified database has a job of the specified type in it.
88 * The type may be false to indicate "all".
90 * @param $dbName string
93 function checkJob( $type, $dbName ) {
94 $lb = wfGetLB( $dbName );
95 $db = $lb->getConnection( DB_MASTER
, array(), $dbName );
96 if ( $type === false ) {
99 $conds = array( 'job_cmd' => $type );
102 $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__
);
103 $lb->reuseConnection( $db );
108 * Get all databases that have a pending job
111 private function getPendingDbs() {
112 global $wgLocalDatabases;
113 $pendingDBs = array();
114 # Cross-reference DBs by master DB server
115 $dbsByMaster = array();
116 foreach ( $wgLocalDatabases as $db ) {
117 $lb = wfGetLB( $db );
118 $dbsByMaster[$lb->getServerName( 0 )][] = $db;
121 foreach ( $dbsByMaster as $dbs ) {
122 $dbConn = wfGetDB( DB_MASTER
, array(), $dbs[0] );
124 # Padding row for MySQL bug
125 $pad = str_repeat( '-', 40 );
126 $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)";
127 foreach ( $dbs as $wikiId ) {
132 list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId );
133 $dbConn->tablePrefix( $tablePrefix );
134 $jobTable = $dbConn->tableName( 'job' );
136 $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)";
138 $res = $dbConn->query( $sql, __METHOD__
);
140 foreach ( $res as $row ) {
142 // discard padding row
146 $pendingDBs[$row->job_cmd
][] = $row->db
;
153 $maintClass = "nextJobDb";
154 require_once( RUN_MAINTENANCE_IF_MAIN
);