1 package MogileFS
::Store
;
5 use MogileFS
::Util
qw(throw max error);
6 use DBI
; # no reason a Store has to be DBI-based, but for now they all are.
9 # this is incremented whenever the schema changes. server will refuse
10 # to start-up with an old schema version
12 # 6: adds file_to_replicate table
13 # 7: adds file_to_delete_later table
14 # 8: adds fsck_log table
15 # 9: adds 'drain' state to enum in device table
16 # 10: adds 'replpolicy' column to 'class' table
17 # 11: adds 'file_to_queue' table
18 # 12: adds 'file_to_delete2' table
19 # 13: modifies 'server_settings.value' to TEXT for wider values
20 # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21 # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22 use constant SCHEMA_VERSION
=> 14;
26 return $class->new_from_dsn_user_pass(map { MogileFS
->config($_) } qw(db_dsn db_user db_pass max_handles));
29 sub new_from_dsn_user_pass
{
30 my ($class, $dsn, $user, $pass, $max_handles) = @_;
32 if ($dsn =~ /^DBI:mysql:/i) {
33 $subclass = "MogileFS::Store::MySQL";
34 } elsif ($dsn =~ /^DBI:SQLite:/i) {
35 $subclass = "MogileFS::Store::SQLite";
36 } elsif ($dsn =~ /^DBI:Oracle:/i) {
37 $subclass = "MogileFS::Store::Oracle";
38 } elsif ($dsn =~ /^DBI:Pg:/i) {
39 $subclass = "MogileFS::Store::Postgres";
41 die "Unknown database type: $dsn";
43 unless (eval "use $subclass; 1") {
44 die "Error loading $subclass: $@\n";
50 max_handles
=> $max_handles, # Max number of handles to allow
51 raise_errors
=> $subclass->want_raise_errors,
52 slave_list_cachetime
=> 0,
53 slave_list_cache
=> [],
54 recheck_req_gen
=> 0, # incremented generation, of recheck of dbh being requested
55 recheck_done_gen
=> 0, # once recheck is done, copy of what the request generation was
56 handles_left
=> 0, # amount of times this handle can still be verified
57 server_setting_cache
=> {}, # value-agnostic db setting cache.
63 # Defaults to true now.
64 sub want_raise_errors
{
68 sub new_from_mogdbsetup
{
69 my ($class, %args) = @_;
70 # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
71 my $dsn = $class->dsn_of_dbhost($args{dbname
}, $args{dbhost
}, $args{dbport
});
73 my $try_make_sto = sub {
74 my $dbh = DBI
->connect($dsn, $args{dbuser
}, $args{dbpass
}, {
77 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser
}, $args{dbpass
});
82 # upgrading, apparently, as this database already exists.
83 my $sto = $try_make_sto->();
86 # otherwise, we need to make the requested database, setup permissions, etc
87 $class->status("couldn't connect to database as mogilefs user. trying root...");
88 my $rootdsn = $class->dsn_of_root($args{dbname
}, $args{dbhost
}, $args{dbport
});
89 my $rdbh = DBI
->connect($rootdsn, $args{dbrootuser
}, $args{dbrootpass
}, {
92 die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI
->errstr . "\n";
93 $class->status("connected to database as root user.");
95 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
96 $class->create_db_if_not_exists($rdbh, $args{dbname
});
97 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
98 $class->grant_privileges($rdbh, $args{dbname
}, $args{dbuser
}, $args{dbpass
});
100 # should be ready now:
101 $sto = $try_make_sto->();
104 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
107 # given a root DBI connection, create the named database. succeed
108 # if it it's made, or already exists. die otherwise.
109 sub create_db_if_not_exists
{
110 my ($pkg, $rdbh, $dbname) = @_;
111 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
112 or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
115 sub grant_privileges
{
116 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
117 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
119 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
120 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
122 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
125 sub can_replace
{ 0 }
126 sub can_insertignore
{ 0 }
127 sub can_insert_multi
{ 0 }
128 sub can_for_update
{ 1 }
130 sub unix_timestamp
{ die "No function in $_[0] to return DB's unixtime." }
134 return "INSERT IGNORE " if $self->can_insertignore;
135 return "REPLACE " if $self->can_replace;
136 die "Can't INSERT IGNORE or REPLACE?";
139 my $on_status = sub {};
140 my $on_confirm = sub { 1 };
141 sub on_status
{ my ($pkg, $code) = @_; $on_status = $code; };
142 sub on_confirm
{ my ($pkg, $code) = @_; $on_confirm = $code; };
143 sub status
{ my ($pkg, $msg) = @_; $on_status->($msg); };
144 sub confirm
{ my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
146 sub latest_schema_version
{ SCHEMA_VERSION
}
150 $self->{raise_errors
} = 1;
151 $self->dbh->{RaiseError
} = 1;
154 sub dsn
{ $_[0]{dsn
} }
155 sub user
{ $_[0]{user
} }
156 sub pass
{ $_[0]{pass
} }
159 sub post_dbi_connect
{ 1 }
161 sub can_do_slaves
{ 0 }
165 die "Incapable of becoming slave." unless $self->can_do_slaves;
172 return $self->{slave
};
175 # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
180 # only reload every 15 seconds.
181 if ($self->{slave_list_cachetime
} > $now - 15) {
182 return @
{$self->{slave_list_cache
}};
184 $self->{slave_list_cachetime
} = $now;
185 $self->{slave_list_cache
} = [];
187 my $sk = MogileFS
::Config
->server_setting('slave_keys')
191 foreach my $key (split /\s*,\s*/, $sk) {
192 my $slave = MogileFS
::Config
->server_setting("slave_$key");
195 error
("key for slave DB config: slave_$key not found in configuration");
199 my ($dsn, $user, $pass) = split /\|/, $slave;
200 if (!defined($dsn) or !defined($user) or !defined($pass)) {
201 error
("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
204 push @ret, [$dsn, $user, $pass]
207 $self->{slave_list_cache
} = \
@ret;
214 die "Incapable of having slaves." unless $self->can_do_slaves;
216 return $self->{slave
} if $self->check_slave;
218 my @slaves_list = $self->_slaves_list;
220 # If we have no slaves, then return silently.
221 return unless @slaves_list;
223 foreach my $slave_fulldsn (@slaves_list) {
224 my $newslave = $self->{slave
} = $self->new_from_dsn_user_pass(@
$slave_fulldsn);
225 $self->{slave_next_check
} = 0;
226 $newslave->mark_as_slave;
228 if $self->check_slave;
231 warn "Slave list exhausted, failing back to master.";
238 return $self unless $self->can_do_slaves;
240 if ($self->{slave_ok
}) {
241 if (my $slave = $self->get_slave) {
242 $slave->{recheck_req_gen
} = $self->{recheck_req_gen
};
254 return unless ref $coderef eq 'CODE';
256 local $self->{slave_ok
} = 1;
258 return $coderef->(@_);
263 $self->{recheck_req_gen
}++;
270 if ($self->{recheck_done_gen
} != $self->{recheck_req_gen
}) {
271 $self->{dbh
} = undef unless $self->{dbh
}->ping;
272 # Handles a memory leak under Solaris/Postgres.
273 $self->{dbh
} = undef if ($self->{max_handles
} &&
274 $self->{handles_left
}-- < 0);
275 $self->{recheck_done_gen
} = $self->{recheck_req_gen
};
277 return $self->{dbh
} if $self->{dbh
};
280 $self->{dbh
} = DBI
->connect($self->{dsn
}, $self->{user
}, $self->{pass
}, {
283 # FUTURE: will default to on (have to validate all callers first):
284 RaiseError
=> ($self->{raise_errors
} || 0),
286 die "Failed to connect to database: " . DBI
->errstr;
287 $self->post_dbi_connect;
288 $self->{handles_left
} = $self->{max_handles
} if $self->{max_handles
};
294 return $self->dbh->ping;
298 my ($self, $optmsg) = @_;
299 my $dbh = $self->dbh;
300 return unless $dbh->err;
301 my ($pkg, $fn, $line) = caller;
302 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
303 $msg .= ": $optmsg" if $optmsg;
304 # Auto rollback failures around transactions.
305 if ($dbh->{AutoCommit
} == 0) { eval { $dbh->rollback }; }
310 my ($self, $sql, @do_params) = @_;
311 my $rv = eval { $self->dbh->do($sql, @do_params) };
312 return $rv unless $@
|| $self->dbh->err;
313 warn "Error with SQL: $sql\n";
314 Carp
::confess
($@
|| $self->dbh->errstr);
318 croak
("Odd number of parameters!") if scalar(@_) % 2;
319 my ($self, $vlist, %uarg) = @_;
321 $ret{$_} = delete $uarg{$_} foreach @
$vlist;
322 croak
("Bogus options: ".join(',',keys %uarg)) if %uarg;
326 sub was_deadlock_error
{
328 my $dbh = $self->dbh;
332 sub was_duplicate_error
{
334 my $dbh = $self->dbh;
338 # run a subref (presumably a database update) in an eval, because you expect it to
339 # maybe fail on duplicate key error, and throw a dup exception for you, else return
342 my ($self, $code) = @_;
343 my $rv = eval { $code->(); };
344 throw
("dup") if $self->was_duplicate_error;
349 # insert row if doesn't already exist
350 # WARNING: This function is NOT transaction safe if the duplicate errors causes
351 # your transaction to halt!
352 # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
353 # is false! Rows before the duplicate will be inserted, but rows after the
354 # duplicate might not be, depending your database.
356 my ($self, $sql, @params) = @_;
357 my $dbh = $self->dbh;
358 if ($self->can_insertignore) {
359 return $dbh->do("INSERT IGNORE $sql", @params);
361 # TODO: Detect bad multi-row insert here.
362 my $rv = eval { $dbh->do("INSERT $sql", @params); };
363 if ($@
|| $dbh->err) {
364 return 1 if $self->was_duplicate_error;
365 # This chunk is identical to condthrow, but we include it directly
366 # here as we know there is definitely an error, and we would like
367 # the caller of this function.
368 my ($pkg, $fn, $line) = caller;
369 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
376 sub retry_on_deadlock
{
379 my $tries = shift || 3;
380 croak
("deadlock retries must be positive") if $tries < 1;
383 while ($tries-- > 0) {
384 $rv = eval { $code->(); };
385 next if ($self->was_deadlock_error);
392 # --------------------------------------------------------------------------
396 sub add_extra_tables
{
398 push @extra_tables, @_;
401 use constant TABLES
=> qw( domain class file tempfile file_to_delete
402 unreachable_fids file_on file_on_corrupt host
403 device server_settings file_to_replicate
404 file_to_delete_later fsck_log file_to_queue
410 my $curver = $sto->schema_version;
412 my $latestver = SCHEMA_VERSION
;
413 if ($curver == $latestver) {
414 $sto->status("Schema already up-to-date at version $curver.");
418 if ($curver > $latestver) {
419 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
423 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
426 foreach my $t (TABLES
, @extra_tables) {
427 $sto->create_table($t);
430 $sto->upgrade_add_host_getport;
431 $sto->upgrade_add_host_altip;
432 $sto->upgrade_add_device_asof;
433 $sto->upgrade_add_device_weight;
434 $sto->upgrade_add_device_readonly;
435 $sto->upgrade_add_device_drain;
436 $sto->upgrade_add_class_replpolicy;
437 $sto->upgrade_modify_server_settings_value;
438 $sto->upgrade_add_file_to_queue_arg;
439 $sto->upgrade_modify_device_size;
444 sub cached_schema_version
{
446 return $self->{_cached_schema_version
} ||=
447 $self->schema_version;
452 my $dbh = $self->dbh;
454 $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
458 sub filter_create_sql
{ my ($self, $sql) = @_; return $sql; }
461 my ($self, $table) = @_;
462 my $dbh = $self->dbh;
463 return 1 if $self->table_exists($table);
464 my $meth = "TABLE_$table";
465 my $sql = $self->$meth;
466 $sql = $self->filter_create_sql($sql);
467 $self->status("Running SQL: $sql;");
469 die "Failed to create table $table: " . $dbh->errstr;
470 my $imeth = "INDEXES_$table";
471 my @indexes = eval { $self->$imeth };
472 foreach $sql (@indexes) {
473 $self->status("Running SQL: $sql;");
475 die "Failed to create indexes on $table: " . $dbh->errstr;
479 # Please try to keep all tables aligned nicely
480 # with '"CREATE TABLE' on the first line
481 # and ')"' alone on the last line.
484 # classes are tied to domains. domains can have classes of items
485 # with different mindevcounts.
487 # a minimum devcount is the number of copies the system tries to
488 # maintain for files in that class
490 # unspecified classname means classid=0 (implicit class), and that
491 # implies mindevcount=2
492 "CREATE TABLE domain (
493 dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
494 namespace VARCHAR(255),
500 "CREATE TABLE class (
501 dmid SMALLINT UNSIGNED NOT NULL,
502 classid TINYINT UNSIGNED NOT NULL,
503 PRIMARY KEY (dmid,classid),
504 classname VARCHAR(50),
505 UNIQUE (dmid,classname),
506 mindevcount TINYINT UNSIGNED NOT NULL
510 # the length field is only here for easy verifications of content
511 # integrity when copying around. no sums or content types or other
512 # metadata here. application can handle that.
514 # classid is what class of file this belongs to. for instance, on fotobilder
515 # there will be a class for original pictures (the ones the user uploaded)
516 # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
517 # each domain can setup classes and assign the minimum redundancy level for
518 # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
519 # photos and and a 1 minimum for derived images (which means the sole device
520 # for a derived image can die, bringing devcount to 0 for that file, but
521 # the application can recreate it from its original)
524 fid INT UNSIGNED NOT NULL,
527 dmid SMALLINT UNSIGNED NOT NULL,
528 dkey VARCHAR(255), # domain-defined
529 UNIQUE dkey (dmid, dkey),
531 length BIGINT UNSIGNED, # big limit
533 classid TINYINT UNSIGNED NOT NULL,
534 devcount TINYINT UNSIGNED NOT NULL,
535 INDEX devcount (dmid,classid,devcount)
540 "CREATE TABLE tempfile (
541 fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
544 createtime INT UNSIGNED NOT NULL,
545 classid TINYINT UNSIGNED NOT NULL,
546 dmid SMALLINT UNSIGNED NOT NULL,
552 # files marked for death when their key is overwritten. then they get a new
553 # fid, but since the old row (with the old fid) had to be deleted immediately,
554 # we need a place to store the fid so an async job can delete the file from
556 sub TABLE_file_to_delete
{
557 "CREATE TABLE file_to_delete (
558 fid INT UNSIGNED NOT NULL,
563 # if the replicator notices that a fid has no sources, that file gets inserted
564 # into the unreachable_fids table. it is up to the application to actually
565 # handle fids stored in this table.
566 sub TABLE_unreachable_fids
{
567 "CREATE TABLE unreachable_fids (
568 fid INT UNSIGNED NOT NULL,
569 lastupdate INT UNSIGNED NOT NULL,
575 # what files are on what devices? (most likely physical devices,
576 # as logical devices of RAID arrays would be costly, and mogilefs
577 # already handles redundancy)
579 # the devid index lets us answer "What files were on this now-dead disk?"
581 "CREATE TABLE file_on (
582 fid INT UNSIGNED NOT NULL,
583 devid MEDIUMINT UNSIGNED NOT NULL,
584 PRIMARY KEY (fid, devid),
589 # if application or framework detects an error in one of the duplicate files
590 # for whatever reason, it can register its complaint and the framework
591 # will do some verifications and fix things up w/ an async job
592 # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
593 # on the other devices?
594 sub TABLE_file_on_corrupt
{
595 "CREATE TABLE file_on_corrupt (
596 fid INT UNSIGNED NOT NULL,
597 devid MEDIUMINT UNSIGNED NOT NULL,
598 PRIMARY KEY (fid, devid)
602 # hosts (which contain devices...)
605 hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
607 status ENUM('alive','dead','down'),
608 http_port MEDIUMINT UNSIGNED DEFAULT 7500,
609 http_get_port MEDIUMINT UNSIGNED,
611 hostname VARCHAR(40),
623 "CREATE TABLE device (
624 devid MEDIUMINT UNSIGNED NOT NULL,
625 hostid MEDIUMINT UNSIGNED NOT NULL,
627 status ENUM('alive','dead','down'),
628 weight MEDIUMINT DEFAULT 100,
630 mb_total INT UNSIGNED,
631 mb_used INT UNSIGNED,
632 mb_asof INT UNSIGNED,
638 sub TABLE_server_settings
{
639 "CREATE TABLE server_settings (
640 field VARCHAR(50) PRIMARY KEY,
645 sub TABLE_file_to_replicate
{
646 # nexttry is time to try to replicate it next.
647 # 0 means immediate. it's only on one host.
648 # 1 means lower priority. it's on 2+ but isn't happy where it's at.
649 # unix timestamp means at/after that time. some previous error occurred.
650 # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
651 # failcount. how many times we've failed, just for doing backoff of nexttry.
652 # flags. reserved for future use.
653 "CREATE TABLE file_to_replicate (
654 fid INT UNSIGNED NOT NULL PRIMARY KEY,
655 nexttry INT UNSIGNED NOT NULL,
657 fromdevid INT UNSIGNED,
658 failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
659 flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
663 sub TABLE_file_to_delete_later
{
664 "CREATE TABLE file_to_delete_later (
665 fid INT UNSIGNED NOT NULL PRIMARY KEY,
666 delafter INT UNSIGNED NOT NULL,
672 "CREATE TABLE fsck_log (
673 logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
675 utime INT UNSIGNED NOT NULL,
676 fid INT UNSIGNED NULL,
678 devid MEDIUMINT UNSIGNED,
683 # generic queue table, designed to be used for workers/jobs which aren't
684 # constantly in use, and are async to the user.
685 # ie; fsck, drain, rebalance.
686 sub TABLE_file_to_queue
{
687 "CREATE TABLE file_to_queue (
688 fid INT UNSIGNED NOT NULL,
690 type TINYINT UNSIGNED NOT NULL,
691 nexttry INT UNSIGNED NOT NULL,
692 failcount TINYINT UNSIGNED NOT NULL default '0',
693 flags SMALLINT UNSIGNED NOT NULL default '0',
695 PRIMARY KEY (fid, type),
696 INDEX type_nexttry (type,nexttry)
700 # new style async delete table.
701 # this is separate from file_to_queue since deletes are more actively used,
702 # and partitioning on 'type' doesn't always work so well.
703 sub TABLE_file_to_delete2
{
704 "CREATE TABLE file_to_delete2 (
705 fid INT UNSIGNED NOT NULL PRIMARY KEY,
706 nexttry INT UNSIGNED NOT NULL,
707 failcount TINYINT UNSIGNED NOT NULL default '0',
708 INDEX nexttry (nexttry)
712 # these five only necessary for MySQL, since no other database existed
713 # before, so they can just create the tables correctly to begin with.
714 # in the future, there might be new alters that non-MySQL databases
715 # will have to implement.
716 sub upgrade_add_host_getport
{ 1 }
717 sub upgrade_add_host_altip
{ 1 }
718 sub upgrade_add_device_asof
{ 1 }
719 sub upgrade_add_device_weight
{ 1 }
720 sub upgrade_add_device_readonly
{ 1 }
721 sub upgrade_add_device_drain
{ die "Not implemented in $_[0]" }
722 sub upgrade_modify_server_settings_value
{ die "Not implemented in $_[0]" }
723 sub upgrade_add_file_to_queue_arg
{ die "Not implemented in $_[0]" }
724 sub upgrade_modify_device_size
{ die "Not implemented in $_[0]" }
726 sub upgrade_add_class_replpolicy
{
728 unless ($self->column_type("class", "replpolicy")) {
729 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
733 # return true if deleted, 0 if didn't exist, exception if error
735 my ($self, $hostid) = @_;
736 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
739 # return true if deleted, 0 if didn't exist, exception if error
741 my ($self, $dmid) = @_;
742 throw
("has_files") if $self->domain_has_files($dmid);
743 throw
("has_classes") if $self->domain_has_classes($dmid);
744 return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
747 sub domain_has_files
{
748 my ($self, $dmid) = @_;
749 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
751 return $has_a_fid ?
1 : 0;
754 sub domain_has_classes
{
755 my ($self, $dmid) = @_;
756 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? LIMIT 1',
758 return $has_a_class ?
1 : 0;
761 sub class_has_files
{
762 my ($self, $dmid, $clid) = @_;
763 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
764 undef, $dmid, $clid);
765 return $has_a_fid ?
1 : 0;
768 # return new classid on success (non-zero integer), die on failure
769 # throw 'dup' on duplicate name
770 # override this if you want a less racy version.
772 my ($self, $dmid, $classname) = @_;
773 my $dbh = $self->dbh;
775 # get the max class id in this domain
776 my $maxid = $dbh->selectrow_array
777 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
779 # now insert the new class
781 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
782 undef, $dmid, $maxid + 1, $classname, 2);
784 if ($@
|| $dbh->err) {
785 if ($self->was_duplicate_error) {
789 return $maxid + 1 if $rv;
794 # return 1 on success, throw "dup" on duplicate name error, die otherwise
795 sub update_class_name
{
797 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
799 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
800 undef, $arg{classname
}, $arg{dmid
}, $arg{classid
});
802 throw
("dup") if $self->was_duplicate_error;
807 # return 1 on success, die otherwise
808 sub update_class_mindevcount
{
810 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
812 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
813 undef, $arg{mindevcount
}, $arg{dmid
}, $arg{classid
});
819 # return 1 on success, die otherwise
820 sub update_class_replpolicy
{
822 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
824 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
825 undef, $arg{replpolicy
}, $arg{dmid
}, $arg{classid
});
831 sub nfiles_with_dmid_classid_devcount
{
832 my ($self, $dmid, $classid, $devcount) = @_;
833 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
834 undef, $dmid, $classid, $devcount);
837 sub set_server_setting
{
838 my ($self, $key, $val) = @_;
839 my $dbh = $self->dbh;
840 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
844 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
846 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
850 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
854 # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
855 sub incr_server_setting
{
856 my ($self, $key, $val) = @_;
857 $val = 1 unless defined $val;
860 return 1 if $self->dbh->do("UPDATE server_settings ".
861 "SET value=value+? ".
862 "WHERE field=?", undef,
864 $self->set_server_setting($key, $val);
868 my ($self, $key) = @_;
869 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
873 # generic server setting cache.
874 # note that you can call the same server setting with different timeouts, but
875 # the timeout specified at the time of ... timeout, wins.
876 sub server_setting_cached
{
877 my ($self, $key, $timeout) = @_;
878 $self->{server_setting_cache
}->{$key} ||= {val
=> '', refresh
=> 0};
879 my $cache = $self->{server_setting_cache
}->{$key};
881 if ($now > $cache->{refresh
}) {
882 $cache->{val
} = $self->server_setting($key);
883 $cache->{refresh
} = $now + $timeout;
885 return $cache->{val
};
888 sub server_settings
{
891 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
893 while (my ($k, $v) = $sth->fetchrow_array) {
899 # register a tempfile and return the fidid, which should be allocated
900 # using autoincrement/sequences if the passed in fid is undef. however,
901 # if fid is passed in, that value should be used and returned.
903 # return new/passed in fidid on success.
904 # throw 'dup' if fid already in use
905 # return 0/undef/die on failure
907 sub register_tempfile
{
909 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
911 my $dbh = $self->dbh;
914 my $explicit_fid_used = $fid ?
1 : 0;
916 # setup the new mapping. we store the devices that we picked for
917 # this file in here, knowing that they might not be used. create_close
918 # is responsible for actually mapping in file_on. NOTE: fid is being
919 # passed in, it's either some number they gave us, or it's going to be
920 # 0/undef which translates into NULL which means to automatically create
921 # one. that should be fine.
922 my $ins_tempfile = sub {
924 # We must only pass the correct number of bind parameters
925 # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
926 # Postgres, where you are expected to leave it out or use DEFAULT
927 # Leaving it out seems sanest and least likely to cause problems
928 # with other databases.
929 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
930 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
931 my @vals = ($arg{dmid
}, $arg{key
}, $arg{classid
} || 0, $arg{devids
});
932 # Do not check for $explicit_fid_used, but rather $fid directly
933 # as this anonymous sub is called from the loop later
935 unshift @keys, 'fid';
939 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
940 $dbh->do($sql, undef, @vals);
943 return undef if $self->was_duplicate_error;
944 die "Unexpected db error into tempfile: " . $dbh->errstr;
947 unless (defined $fid) {
948 # if they did not give us a fid, then we want to grab the one that was
949 # theoretically automatically generated
950 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
951 or die "No last_insert_id found";
953 return undef unless defined $fid && $fid > 0;
957 unless ($ins_tempfile->()) {
958 throw
("dup") if $explicit_fid_used;
959 die "tempfile insert failed";
962 my $fid_in_use = sub {
963 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
964 return $exists ?
1 : 0;
967 # if the fid is in use, do something
968 while ($fid_in_use->($fid)) {
969 throw
("dup") if $explicit_fid_used;
971 # be careful of databases which reset their
972 # auto-increment/sequences when the table is empty (InnoDB
973 # did/does this, for instance). So check if it's in use, and
974 # re-seed the table with the highest known fid from the file
977 # get the highest fid from the filetable and insert a dummy row
978 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
979 $ins_tempfile->(); # don't care about its result
981 # then do a normal auto-increment
983 $ins_tempfile->() or die "register_tempfile failed after seeding";
989 # return hashref of row containing columns "fid, dmid, dkey, length,
990 # classid, devcount" provided a $dmid and $key (dkey). or undef if no
992 sub file_row_from_dmid_key
{
993 my ($self, $dmid, $key) = @_;
994 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
995 "FROM file WHERE dmid=? AND dkey=?",
999 # return hashref of row containing columns "fid, dmid, dkey, length,
1000 # classid, devcount" provided a $fidid or undef if no row.
1001 sub file_row_from_fidid
{
1002 my ($self, $fidid) = @_;
1003 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1004 "FROM file WHERE fid=?",
1008 # return an arrayref of rows containing columns "fid, dmid, dkey, length,
1009 # classid, devcount" provided a pair of $fidid or undef if no rows.
1010 sub file_row_from_fidid_range
{
1011 my ($self, $fromfid, $count) = @_;
1012 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1013 "FROM file WHERE fid > ? LIMIT ?");
1014 $sth->execute($fromfid,$count);
1015 return $sth->fetchall_arrayref({});
1018 # return array of devids that a fidid is on
1020 my ($self, $fidid) = @_;
1021 return @
{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
1022 undef, $fidid) || [] };
1025 # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1026 sub fid_devids_multiple
{
1027 my ($self, @fidids) = @_;
1028 my $in = join(",", map { $_+0 } @fidids);
1030 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1032 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1033 push @
{$ret->{$fidid} ||= []}, $devid;
1038 # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1039 sub tempfile_row_from_fid
{
1040 my ($self, $fidid) = @_;
1041 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1042 "FROM tempfile WHERE fid=?",
1046 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1048 my ($self, $devid, $hostid, $status) = @_;
1049 my $rv = $self->conddup(sub {
1050 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1051 $devid, $hostid, $status);
1054 die "error making device $devid\n" unless $rv > 0;
1059 my ($self, $devid, $to_update) = @_;
1060 my @keys = sort keys %$to_update;
1061 return unless @keys;
1062 $self->conddup(sub {
1063 $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1064 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
1070 sub update_device_usage
{
1072 my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
1074 $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
1075 " WHERE devid = ?", undef, $arg{mb_total
}, $arg{mb_used
}, $arg{devid
});
1080 # This is unimplemented at the moment as we must verify:
1081 # - no file_on rows exist
1082 # - nothing in file_to_queue is going to attempt to use it
1083 # - nothing in file_to_replicate is going to attempt to use it
1084 # - it's already been marked dead
1085 # - that all trackers are likely to know this :/
1086 # - ensure the devid can't be reused
1087 # IE; the user can't mark it dead then remove it all at once and cause their
1088 # cluster to implode.
1090 die "Unimplemented; needs further testing";
1093 sub mark_fidid_unreachable
{
1094 my ($self, $fidid) = @_;
1095 die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
1096 $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
1100 sub set_device_weight
{
1101 my ($self, $devid, $weight) = @_;
1103 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1108 sub set_device_state
{
1109 my ($self, $devid, $state) = @_;
1111 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1117 my ($self, $dmid, $cid) = @_;
1118 throw
("has_files") if $self->class_has_files($dmid, $cid);
1120 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1126 my ($self, $fidid) = @_;
1127 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
1129 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1131 $self->enqueue_for_delete2($fidid, 0);
1135 sub delete_tempfile_row
{
1136 my ($self, $fidid) = @_;
1137 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1142 # Load the specified tempfile, then delete it. If we succeed, we were
1143 # here first; otherwise, someone else beat us here (and we return undef)
1144 sub delete_and_return_tempfile_row
{
1145 my ($self, $fidid) = @_;
1146 my $rv = $self->tempfile_row_from_fid($fidid);
1147 my $rows_deleted = $self->delete_tempfile_row($fidid);
1148 return $rv if ($rows_deleted > 0);
1151 sub replace_into_file
{
1153 my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
1154 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1156 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1157 "VALUES (?,?,?,?,?,0) ", undef,
1158 @arg{'fidid', 'dmid', 'key', 'length', 'classid'});
1163 # returns 1 on success, 0 on duplicate key error, dies on exception
1164 # TODO: need a test to hit the duplicate name error condition
1165 # TODO: switch to using "dup" exception here?
1167 my ($self, $fidid, $to_key) = @_;
1168 my $dbh = $self->dbh;
1170 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1171 undef, $to_key, $fidid);
1173 if ($@
|| $dbh->err) {
1174 # first is MySQL's error code for duplicates
1175 if ($self->was_duplicate_error) {
1185 sub get_domainid_by_name
{
1187 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1192 # returns a hash of domains. Key is namespace, value is dmid.
1193 sub get_all_domains
{
1195 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1196 return map { ($_->[0], $_->[1]) } @
{$domains || []};
1199 sub get_classid_by_name
{
1201 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1202 undef, $_[0], $_[1]);
1206 # returns an array of hashrefs, one hashref per row in the 'class' table
1207 sub get_all_classes
{
1212 if ($self->cached_schema_version >= 10) {
1213 $repl_col = ", replpolicy";
1216 my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount $repl_col FROM class");
1218 push @ret, $row while $row = $sth->fetchrow_hashref;
1222 # add a record of fidid existing on devid
1223 # returns 1 on success, 0 on duplicate
1224 sub add_fidid_to_devid
{
1225 my ($self, $fidid, $devid) = @_;
1226 croak
("fidid not non-zero") unless $fidid;
1227 croak
("devid not non-zero") unless $devid;
1229 # TODO: This should possibly be insert_ignore instead
1230 # As if we are adding an extra file_on entry, we do not want to replace the
1231 # exist one. Check REPLACE semantics.
1232 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1233 undef, $fidid, $devid);
1234 return 1 if $rv > 0;
1238 # remove a record of fidid existing on devid
1239 # returns 1 on success, 0 if not there anyway
1240 sub remove_fidid_from_devid
{
1241 my ($self, $fidid, $devid) = @_;
1242 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1243 undef, $fidid, $devid); };
1248 # Test if host exists.
1249 sub get_hostid_by_id
{
1251 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1256 sub get_hostid_by_name
{
1258 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1263 # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1266 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1267 "hostip, http_port, http_get_port, altip, altmask FROM host");
1270 while (my $row = $sth->fetchrow_hashref) {
1276 # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1277 sub get_all_devices
{
1279 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1280 "mb_used, mb_asof, status, weight FROM device");
1284 while (my $row = $sth->fetchrow_hashref) {
1290 # update the device count for a given fidid
1291 sub update_devcount
{
1292 my ($self, $fidid) = @_;
1293 my $dbh = $self->dbh;
1294 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1297 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1304 # update the classid for a given fidid
1305 sub update_classid
{
1306 my ($self, $fidid, $classid) = @_;
1307 my $dbh = $self->dbh;
1309 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1316 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1317 sub enqueue_for_replication
{
1318 my ($self, $fidid, $from_devid, $in) = @_;
1321 my $nexttry = $self->unix_timestamp . " + " . int($in);
1323 $self->retry_on_deadlock(sub {
1324 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1325 "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1329 # enqueue a fidid for delete
1330 # note: if we get one more "independent" queue like this, the
1331 # code should be collapsable? I tried once and it looked too ugly, so we have
1333 sub enqueue_for_delete2
{
1334 my ($self, $fidid, $in) = @_;
1337 my $nexttry = $self->unix_timestamp . " + " . int($in);
1339 $self->retry_on_deadlock(sub {
1340 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1341 "VALUES (?,$nexttry)", undef, $fidid);
1345 # enqueue a fidid for work
1346 sub enqueue_for_todo
{
1347 my ($self, $fidid, $type, $in) = @_;
1350 my $nexttry = $self->unix_timestamp . " + " . int($in);
1352 $self->retry_on_deadlock(sub {
1354 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1355 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1356 $fidid->[0], $fidid->[1], $fidid->[2], $type);
1358 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1359 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1364 # return 1 on success. die otherwise.
1365 sub enqueue_many_for_todo
{
1366 my ($self, $fidids, $type, $in) = @_;
1367 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1368 $self->enqueue_for_todo($_, $type, $in) foreach @
$fidids;
1373 my $nexttry = $self->unix_timestamp . " + " . int($in);
1375 # TODO: convert to prepared statement?
1376 $self->retry_on_deadlock(sub {
1377 if (ref($fidids->[0]) eq 'ARRAY') {
1378 my $sql = $self->ignore_replace .
1379 "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1380 join(', ', ('(?,?,?,?,?)') x
scalar @
$fidids);
1381 $self->dbh->do($sql, undef, map { @
$_, $type, $nexttry } @
$fidids);
1383 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1385 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @
$fidids));
1391 # For file_to_queue queues that should be kept small, find the size.
1392 # This isn't fast, but for small queues won't be slow, and is usually only ran
1393 # from a single tracker.
1394 sub file_queue_length
{
1398 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1399 "WHERE type = ?", undef, $type);
1402 # reschedule all deferred replication, return number rescheduled
1406 $self->retry_on_deadlock(sub {
1407 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1408 " WHERE nexttry > " . $self->unix_timestamp);
1412 # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1413 sub get_fidids_by_device
{
1414 my ($self, $devid, $limit) = @_;
1416 my $dbh = $self->dbh;
1417 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1422 # finds a chunk of fids given a set of constraints:
1423 # devid, fidid, age (new or old), limit
1424 # Note that if this function is very slow on your large DB, you're likely
1425 # sorting by "newfiles" and are missing a new index.
1426 # returns an arrayref of fidids
1427 sub get_fidid_chunks_by_device
{
1428 my ($self, %o) = @_;
1430 my $dbh = $self->dbh;
1431 my $devid = delete $o{devid
};
1432 croak
("must supply at least a devid") unless $devid;
1433 my $age = delete $o{age
};
1434 my $fidid = delete $o{fidid
};
1435 my $limit = delete $o{limit
};
1436 croak
("invalid options: " . join(', ', keys %o)) if %o;
1437 # If supplied a "previous" fidid, we're paging through.
1441 if ($age eq 'old') {
1442 $fidsort = 'AND fid > ?' if $fidid;
1444 } elsif ($age eq 'new') {
1445 $fidsort = 'AND fid < ?' if $fidid;
1448 croak
("invalid age argument: " . $age);
1452 push @extra, $fidid if $fidid;
1454 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1455 $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1459 # takes two arguments, fidid to be above, and optional limit (default
1460 # 1,000). returns up to that that many fidids above the provided
1461 # fidid. returns array of MogileFS::FID objects, sorted by fid ids.
1462 sub get_fids_above_id
{
1463 my ($self, $fidid, $limit) = @_;
1465 $limit = int($limit);
1468 my $dbh = $self->dbh;
1469 my $sth = $dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1472 "ORDER BY fid LIMIT $limit");
1473 $sth->execute($fidid);
1474 while (my $row = $sth->fetchrow_hashref) {
1475 push @ret, MogileFS
::FID
->new_from_db_row($row);
1480 # Same as above, but returns unblessed hashref.
1481 sub get_fidids_above_id
{
1482 my ($self, $fidid, $limit) = @_;
1484 $limit = int($limit);
1486 my $dbh = $self->dbh;
1487 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file WHERE fid
> ?
1488 ORDER BY fid LIMIT
$limit}, undef, $fidid);
1492 # creates a new domain, given a domain namespace string. return the dmid on success,
1493 # throw 'dup' on duplicate name.
1494 # override if you want a less racy version.
1496 my ($self, $name) = @_;
1497 my $dbh = $self->dbh;
1499 # get the max domain id
1500 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1502 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1503 undef, $maxid + 1, $name);
1505 if ($self->was_duplicate_error) {
1508 return $maxid+1 if $rv;
1509 die "failed to make domain"; # FIXME: the above is racy.
1513 my ($self, $hid, $to_update) = @_;
1514 my @keys = sort keys %$to_update;
1515 return unless @keys;
1516 $self->conddup(sub {
1517 $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1518 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
1524 sub update_host_property
{
1525 my ($self, $hostid, $col, $val) = @_;
1526 $self->conddup(sub {
1527 $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
1532 # return ne hostid, or throw 'dup' on error.
1533 # NOTE: you need to put them into the initial 'down' state.
1535 my ($self, $hostname, $ip) = @_;
1536 my $dbh = $self->dbh;
1537 # racy! lazy. no, better: portable! how often does this happen? :)
1538 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1539 my $rv = $self->conddup(sub {
1540 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1541 "VALUES (?, ?, ?, 'down')",
1542 undef, $hid, $hostname, $ip);
1548 # return array of row hashrefs containing columns: (fid, fromdevid,
1549 # failcount, flags, nexttry)
1550 sub files_to_replicate
{
1551 my ($self, $limit) = @_;
1552 my $ut = $self->unix_timestamp;
1553 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1554 SELECT fid
, fromdevid
, failcount
, flags
, nexttry
1555 FROM file_to_replicate
1556 WHERE nexttry
<= $ut
1559 }, "fid") or return ();
1560 return values %$to_repl_map;
1563 # "new" style queue consumption code.
1564 # from within a transaction, fetch a limit of fids,
1565 # then update each fid's nexttry to be off in the future,
1566 # giving local workers some time to dequeue the items.
1568 # DBI (even with RaiseError) returns weird errors on
1569 # deadlocks from selectall_hashref. So we can't do that.
1570 # we also used to retry on deadlock within the routine,
1571 # but instead lets return undef and let job_master retry.
1572 sub grab_queue_chunk
{
1576 my $extfields = shift;
1578 my $dbh = $self->dbh;
1582 my $extwhere = shift || '';
1583 my $fields = 'fid, nexttry, failcount';
1584 $fields .= ', ' . $extfields if $extfields;
1587 my $ut = $self->unix_timestamp;
1591 WHERE nexttry
<= $ut
1596 $query .= "FOR UPDATE\n" if $self->can_for_update;
1597 my $sth = $dbh->prepare($query);
1599 $work = $sth->fetchall_hashref('fid');
1600 # Nothing to work on.
1601 # Now claim the fids for a while.
1602 # TODO: Should be configurable... but not necessary.
1603 my $fidlist = join(',', keys %$work);
1604 unless ($fidlist) { $dbh->commit; return; }
1605 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1608 if ($self->was_deadlock_error) {
1609 eval { $dbh->rollback };
1614 return defined $work ?
values %$work : ();
1617 sub grab_files_to_replicate
{
1618 my ($self, $limit) = @_;
1619 return $self->grab_queue_chunk('file_to_replicate', $limit,
1620 'fromdevid, flags');
1623 sub grab_files_to_delete2
{
1624 my ($self, $limit) = @_;
1625 return $self->grab_queue_chunk('file_to_delete2', $limit);
1628 # $extwhere is ugly... but should be fine.
1629 sub grab_files_to_queued
{
1630 my ($self, $type, $what, $limit) = @_;
1631 $what ||= 'type, flags';
1632 return $self->grab_queue_chunk('file_to_queue', $limit,
1633 $what, 'AND type = ' . $type);
1636 # although it's safe to have multiple tracker hosts and/or processes
1637 # replicating the same file, around, it's inefficient CPU/time-wise,
1638 # and it's also possible they pick different places and waste disk.
1639 # so the replicator asks the store interface when it's about to start
1640 # and when it's done replicating a fidid, so you can do something smart
1641 # and tell it not to.
1642 sub should_begin_replicating_fidid
{
1643 my ($self, $fidid) = @_;
1644 warn("Inefficient implementation of should_begin_replicating_fidid() in $self!\n");
1648 # called when replicator is done replicating a fid, so you can cleanup
1649 # whatever you did in 'should_begin_replicating_fidid' above.
1651 # NOTE: there's a theoretical race condition in the rebalance code,
1652 # where (without locking as provided by
1653 # should_begin_replicating_fidid/note_done_replicating), all copies of
1654 # a file can be deleted by independent replicators doing rebalancing
1655 # in different ways. so you'll probably want to implement some
1656 # locking in this pair of functions.
1657 sub note_done_replicating
{
1658 my ($self, $fidid) = @_;
1661 sub find_fid_from_file_to_replicate
{
1662 my ($self, $fidid) = @_;
1663 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1667 sub find_fid_from_file_to_delete2
{
1668 my ($self, $fidid) = @_;
1669 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1673 sub find_fid_from_file_to_queue
{
1674 my ($self, $fidid, $type) = @_;
1675 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1676 undef, $fidid, $type);
1679 sub delete_fid_from_file_to_replicate
{
1680 my ($self, $fidid) = @_;
1681 $self->retry_on_deadlock(sub {
1682 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1686 sub delete_fid_from_file_to_queue
{
1687 my ($self, $fidid, $type) = @_;
1688 $self->retry_on_deadlock(sub {
1689 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1690 undef, $fidid, $type);
1694 sub delete_fid_from_file_to_delete2
{
1695 my ($self, $fidid) = @_;
1696 $self->retry_on_deadlock(sub {
1697 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1701 sub reschedule_file_to_replicate_absolute
{
1702 my ($self, $fid, $abstime) = @_;
1703 $self->retry_on_deadlock(sub {
1704 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1705 undef, $abstime, $fid);
1709 sub reschedule_file_to_replicate_relative
{
1710 my ($self, $fid, $in_n_secs) = @_;
1711 $self->retry_on_deadlock(sub {
1712 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1713 "failcount = failcount + 1 WHERE fid = ?",
1714 undef, $in_n_secs, $fid);
1718 sub reschedule_file_to_delete2_absolute
{
1719 my ($self, $fid, $abstime) = @_;
1720 $self->retry_on_deadlock(sub {
1721 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1722 undef, $abstime, $fid);
1726 sub reschedule_file_to_delete2_relative
{
1727 my ($self, $fid, $in_n_secs) = @_;
1728 $self->retry_on_deadlock(sub {
1729 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1730 "failcount = failcount + 1 WHERE fid = ?",
1731 undef, $in_n_secs, $fid);
1735 # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1738 my ($self, $dmid, $prefix, $after, $limit) = @_;
1739 # fix the input... prefix always ends with a % so that it works
1740 # in a LIKE call, and after is either blank or something
1741 $prefix = '' unless defined $prefix;
1743 $after = '' unless defined $after;
1745 # now select out our keys
1746 return $self->dbh->selectcol_arrayref
1747 ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
1748 "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
1751 # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1752 # that were created $secs_ago seconds ago or older.
1754 my ($self, $secs_old) = @_;
1755 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1756 "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1759 # given an array of MogileFS::DevFID objects, mass-insert them all
1760 # into file_on (ignoring if they're already present)
1761 sub mass_insert_file_on
{
1762 my ($self, @devfids) = @_;
1763 return 1 unless @devfids;
1765 if (@devfids > 1 && ! $self->can_insert_multi) {
1766 $self->mass_insert_file_on($_) foreach @devfids;
1770 my (@qmarks, @binds);
1771 foreach my $df (@devfids) {
1772 my ($fidid, $devid) = ($df->fidid, $df->devid);
1773 Carp
::croak
("got a false fidid") unless $fidid;
1774 Carp
::croak
("got a false devid") unless $devid;
1775 push @binds, $fidid, $devid;
1776 push @qmarks, "(?,?)";
1779 # TODO: This should possibly be insert_ignore instead
1780 # As if we are adding an extra file_on entry, we do not want to replace the
1781 # exist one. Check REPLACE semantics.
1782 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1786 sub set_schema_vesion
{
1787 my ($self, $ver) = @_;
1788 $self->set_server_setting("schema_version", int($ver));
1791 # returns array of fidids to try and delete again
1792 sub fids_to_delete_again
{
1794 my $ut = $self->unix_timestamp;
1795 return @
{ $self->dbh->selectcol_arrayref(qq{
1797 FROM file_to_delete_later
1798 WHERE delafter
< $ut
1803 # return 1 on success. die otherwise.
1804 sub enqueue_fids_to_delete
{
1805 my ($self, @fidids) = @_;
1806 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1807 # when the first row causes the duplicate error, and the remaining rows are
1809 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1810 $self->enqueue_fids_to_delete($_) foreach @fidids;
1813 # TODO: convert to prepared statement?
1814 $self->retry_on_deadlock(sub {
1815 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1816 join(",", map { "(" . int($_) . ")" } @fidids));
1821 sub enqueue_fids_to_delete2
{
1822 my ($self, @fidids) = @_;
1823 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1824 # when the first row causes the duplicate error, and the remaining rows are
1826 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1827 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1831 my $nexttry = $self->unix_timestamp;
1833 # TODO: convert to prepared statement?
1834 $self->retry_on_deadlock(sub {
1835 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
1837 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
1842 # clears everything from the fsck_log table
1843 # return 1 on success. die otherwise.
1844 sub clear_fsck_log
{
1846 $self->dbh->do("DELETE FROM fsck_log");
1850 # FIXME: Fsck log entries are processed a little out of order.
1851 # Once a fsck has completed, the log should be re-summarized.
1852 sub fsck_log_summarize
{
1855 my $lockname = 'mgfs:fscksum';
1856 my $lock = eval { $self->get_lock($lockname, 10) };
1857 return 0 if defined $lock && $lock == 0;
1859 my $logid = $self->max_fsck_logid;
1861 # sum-up evcode counts every so often, to make fsck_status faster,
1862 # avoiding a potentially-huge GROUP BY in the future..
1863 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
1865 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
1867 my $cts = $self->fsck_evcode_counts(logid_range
=> [$min_logid, $logid]); # inclusive notation :)
1868 while (my ($evcode, $ct) = each %$cts) {
1869 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
1871 $self->set_server_setting("fsck_logid_processed", $logid);
1873 $self->release_lock($lockname) if $lock;
1877 my ($self, %opts) = @_;
1878 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
1879 "VALUES (" . $self->unix_timestamp . ",?,?,?)",
1883 delete $opts{devid
});
1884 croak
("Unknown opts") if %opts;
1890 sub get_db_unixtime
{
1892 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
1897 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
1900 sub max_fsck_logid
{
1902 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
1905 # returns array of $row hashrefs, from fsck_log table
1907 my ($self, $after_logid, $limit) = @_;
1908 $limit = int($limit || 100);
1909 $after_logid = int($after_logid || 0);
1912 my $sth = $self->dbh->prepare(qq{
1913 SELECT logid
, utime, fid
, evcode
, devid
1919 $sth->execute($after_logid);
1921 push @rows, $row while $row = $sth->fetchrow_hashref;
1925 sub fsck_evcode_counts
{
1926 my ($self, %opts) = @_;
1927 my $timegte = delete $opts{time_gte
};
1928 my $logr = delete $opts{logid_range
};
1934 $sth = $self->dbh->prepare(qq{
1935 SELECT evcode
, COUNT
(*) FROM fsck_log
1939 $sth->execute($timegte||0);
1942 $sth = $self->dbh->prepare(qq{
1943 SELECT evcode
, COUNT
(*) FROM fsck_log
1944 WHERE logid
>= ? AND logid
<= ?
1947 $sth->execute($logr->[0], $logr->[1]);
1949 while (my ($ev, $ct) = $sth->fetchrow_array) {
1955 # run before daemonizing. you can die from here if you see something's amiss. or emit
1957 sub pre_daemonize_checks
{ }
1960 # attempt to grab a lock of lockname, and timeout after timeout seconds.
1961 # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
1963 my ($self, $lockname, $timeout) = @_;
1964 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth
};
1965 die "get_lock not implemented for $self";
1968 # attempt to release a lock of lockname.
1969 # returns 1 on success and 0 if no lock we have has that name.
1971 my ($self, $lockname) = @_;
1972 die "release_lock not implemented for $self";
1975 # returns up to $limit @fidids which are on provided $devid
1976 sub random_fids_on_device
{
1977 my ($self, $devid, $limit) = @_;
1978 $limit = int($limit) || 100;
1980 my $dbh = $self->dbh;
1982 # FIXME: this blows. not random. and good chances these will
1983 # eventually get to point where they're un-rebalance-able, and we
1984 # never move on past the first 5000
1985 my @some_fids = List
::Util
::shuffle
(@
{
1986 $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid=? LIMIT 5000",
1987 undef, $devid) || []
1990 @some_fids = @some_fids[0..$limit-1] if $limit < @some_fids;
2000 MogileFS::Store - data storage provider. base class.
2004 MogileFS aims to be database-independent (though currently as of late
2005 2006 only works with MySQL). In the future, the server will create a
2006 singleton instance of type "MogileFS::Store", like
2007 L<MogileFS::Store::MySQL>, and all database interaction will be
2012 L<MogileFS::Store::MySQL>