1 package MogileFS
::Store
;
5 use MogileFS
::Util
qw(throw max error);
6 use DBI
; # no reason a Store has to be DBI-based, but for now they all are.
7 use List
::Util
qw(shuffle);
9 # this is incremented whenever the schema changes. server will refuse
10 # to start-up with an old schema version
12 # 6: adds file_to_replicate table
13 # 7: adds file_to_delete_later table
14 # 8: adds fsck_log table
15 # 9: adds 'drain' state to enum in device table
16 # 10: adds 'replpolicy' column to 'class' table
17 # 11: adds 'file_to_queue' table
18 # 12: adds 'file_to_delete2' table
19 # 13: modifies 'server_settings.value' to TEXT for wider values
20 # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21 # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22 use constant SCHEMA_VERSION
=> 14;
26 return $class->new_from_dsn_user_pass(map { MogileFS
->config($_) } qw(db_dsn db_user db_pass max_handles));
29 sub new_from_dsn_user_pass
{
30 my ($class, $dsn, $user, $pass, $max_handles) = @_;
32 if ($dsn =~ /^DBI:mysql:/i) {
33 $subclass = "MogileFS::Store::MySQL";
34 } elsif ($dsn =~ /^DBI:SQLite:/i) {
35 $subclass = "MogileFS::Store::SQLite";
36 } elsif ($dsn =~ /^DBI:Oracle:/i) {
37 $subclass = "MogileFS::Store::Oracle";
38 } elsif ($dsn =~ /^DBI:Pg:/i) {
39 $subclass = "MogileFS::Store::Postgres";
41 die "Unknown database type: $dsn";
43 unless (eval "use $subclass; 1") {
44 die "Error loading $subclass: $@\n";
50 max_handles
=> $max_handles, # Max number of handles to allow
51 raise_errors
=> $subclass->want_raise_errors,
52 slave_list_version
=> 0,
53 slave_list_cache
=> [],
54 recheck_req_gen
=> 0, # incremented generation, of recheck of dbh being requested
55 recheck_done_gen
=> 0, # once recheck is done, copy of what the request generation was
56 handles_left
=> 0, # amount of times this handle can still be verified
57 connected_slaves
=> {},
64 # Defaults to true now.
65 sub want_raise_errors
{
69 sub new_from_mogdbsetup
{
70 my ($class, %args) = @_;
71 # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
72 my $dsn = $class->dsn_of_dbhost($args{dbname
}, $args{dbhost
}, $args{dbport
});
74 my $try_make_sto = sub {
75 my $dbh = DBI
->connect($dsn, $args{dbuser
}, $args{dbpass
}, {
78 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser
}, $args{dbpass
});
83 # upgrading, apparently, as this database already exists.
84 my $sto = $try_make_sto->();
87 # otherwise, we need to make the requested database, setup permissions, etc
88 $class->status("couldn't connect to database as mogilefs user. trying root...");
89 my $rootdsn = $class->dsn_of_root($args{dbname
}, $args{dbhost
}, $args{dbport
});
90 my $rdbh = DBI
->connect($rootdsn, $args{dbrootuser
}, $args{dbrootpass
}, {
93 die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI
->errstr . "\n";
94 $class->status("connected to database as root user.");
96 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
97 $class->create_db_if_not_exists($rdbh, $args{dbname
});
98 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
99 $class->grant_privileges($rdbh, $args{dbname
}, $args{dbuser
}, $args{dbpass
});
101 # should be ready now:
102 $sto = $try_make_sto->();
105 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
108 # given a root DBI connection, create the named database. succeed
109 # if it it's made, or already exists. die otherwise.
110 sub create_db_if_not_exists
{
111 my ($pkg, $rdbh, $dbname) = @_;
112 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
113 or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
116 sub grant_privileges
{
117 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
118 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
120 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
121 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
123 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
126 sub can_replace
{ 0 }
127 sub can_insertignore
{ 0 }
128 sub can_insert_multi
{ 0 }
129 sub can_for_update
{ 1 }
131 sub unix_timestamp
{ die "No function in $_[0] to return DB's unixtime." }
135 return "INSERT IGNORE " if $self->can_insertignore;
136 return "REPLACE " if $self->can_replace;
137 die "Can't INSERT IGNORE or REPLACE?";
140 my $on_status = sub {};
141 my $on_confirm = sub { 1 };
142 sub on_status
{ my ($pkg, $code) = @_; $on_status = $code; };
143 sub on_confirm
{ my ($pkg, $code) = @_; $on_confirm = $code; };
144 sub status
{ my ($pkg, $msg) = @_; $on_status->($msg); };
145 sub confirm
{ my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
147 sub latest_schema_version
{ SCHEMA_VERSION
}
151 $self->{raise_errors
} = 1;
152 $self->dbh->{RaiseError
} = 1;
155 sub dsn
{ $_[0]{dsn
} }
156 sub user
{ $_[0]{user
} }
157 sub pass
{ $_[0]{pass
} }
160 sub post_dbi_connect
{ 1 }
162 sub can_do_slaves
{ 0 }
166 die "Incapable of becoming slave." unless $self->can_do_slaves;
173 return $self->{slave
};
176 sub _slaves_list_changed
{
178 my $ver = MogileFS
::Config
->server_setting_cached('slave_version') || 0;
179 if ($ver <= $self->{slave_list_version
}) {
182 $self->{slave_list_version
} = $ver;
183 # Restart connections from scratch if the configuration changed.
184 $self->{connected_slaves
} = {};
188 # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
193 my $sk = MogileFS
::Config
->server_setting_cached('slave_keys')
197 foreach my $key (split /\s*,\s*/, $sk) {
198 my $slave = MogileFS
::Config
->server_setting_cached("slave_$key");
201 error
("key for slave DB config: slave_$key not found in configuration");
205 my ($dsn, $user, $pass) = split /\|/, $slave;
206 if (!defined($dsn) or !defined($user) or !defined($pass)) {
207 error
("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
210 push @ret, [$dsn, $user, $pass]
213 $self->{slave_list_cache
} = \
@ret;
219 my @temp = shuffle
keys %{$self->{connected_slaves
}};
221 return $self->{connected_slaves
}->{$temp[0]};
228 die "Incapable of having slaves." unless $self->can_do_slaves;
230 $self->{slave
} = undef;
231 unless ($self->_slaves_list_changed) {
232 if ($self->{slave
} = $self->_pick_slave) {
233 $self->{slave
}->{recheck_req_gen
} = $self->{recheck_req_gen
};
234 return $self->{slave
} if $self->check_slave;
238 if ($self->{slave
}) {
239 my $dsn = $self->{slave
}->{dsn
};
240 $self->{dead_slaves
}->{$dsn} = $now;
241 delete $self->{connected_slaves
}->{$dsn};
242 error
("Error talking to slave: $dsn");
244 my @slaves_list = $self->_slaves_list;
246 # If we have no slaves, then return silently.
247 return unless @slaves_list;
249 unless (MogileFS
::Config
->server_setting_cached('slave_skip_filtering') eq 'on') {
250 MogileFS
::run_global_hook
('slave_list_filter', \
@slaves_list);
254 MogileFS
::Config
->server_setting_cached('slave_dead_retry_timeout') || 15;
256 foreach my $slave_fulldsn (@slaves_list) {
257 my $dead_timeout = $self->{dead_slaves
}->{$slave_fulldsn->[0]};
258 next if (defined $dead_timeout && $dead_timeout + $dead_retry > $now);
259 next if ($self->{connected_slaves
}->{$slave_fulldsn->[0]});
261 my $newslave = $self->{slave
} = $self->new_from_dsn_user_pass(@
$slave_fulldsn);
262 $self->{slave
}->{next_check
} = 0;
263 $newslave->mark_as_slave;
264 if ($self->check_slave) {
265 $self->{connected_slaves
}->{$slave_fulldsn->[0]} = $newslave;
267 $self->{dead_slaves
}->{$slave_fulldsn->[0]} = $now;
271 if ($self->{slave
} = $self->_pick_slave) {
272 return $self->{slave
};
274 warn "Slave list exhausted, failing back to master.";
281 return $self unless $self->can_do_slaves;
283 if ($self->{slave_ok
}) {
284 if (my $slave = $self->get_slave) {
296 return unless ref $coderef eq 'CODE';
298 local $self->{slave_ok
} = 1;
300 return $coderef->(@_);
305 $self->{recheck_req_gen
}++;
312 if ($self->{recheck_done_gen
} != $self->{recheck_req_gen
}) {
313 $self->{dbh
} = undef unless $self->{dbh
}->ping;
314 # Handles a memory leak under Solaris/Postgres.
315 $self->{dbh
} = undef if ($self->{max_handles
} &&
316 $self->{handles_left
}-- < 0);
317 $self->{recheck_done_gen
} = $self->{recheck_req_gen
};
319 return $self->{dbh
} if $self->{dbh
};
322 $self->{dbh
} = DBI
->connect($self->{dsn
}, $self->{user
}, $self->{pass
}, {
325 # FUTURE: will default to on (have to validate all callers first):
326 RaiseError
=> ($self->{raise_errors
} || 0),
328 die "Failed to connect to database: " . DBI
->errstr;
329 $self->post_dbi_connect;
330 $self->{handles_left
} = $self->{max_handles
} if $self->{max_handles
};
334 sub have_dbh
{ return 1 if $_[0]->{dbh
}; }
338 return $self->dbh->ping;
342 my ($self, $optmsg) = @_;
343 my $dbh = $self->dbh;
344 return 1 unless $dbh->err;
345 my ($pkg, $fn, $line) = caller;
346 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
347 $msg .= ": $optmsg" if $optmsg;
348 # Auto rollback failures around transactions.
349 if ($dbh->{AutoCommit
} == 0) { eval { $dbh->rollback }; }
354 my ($self, $sql, @do_params) = @_;
355 my $rv = eval { $self->dbh->do($sql, @do_params) };
356 return $rv unless $@
|| $self->dbh->err;
357 warn "Error with SQL: $sql\n";
358 Carp
::confess
($@
|| $self->dbh->errstr);
362 croak
("Odd number of parameters!") if scalar(@_) % 2;
363 my ($self, $vlist, %uarg) = @_;
365 $ret{$_} = delete $uarg{$_} foreach @
$vlist;
366 croak
("Bogus options: ".join(',',keys %uarg)) if %uarg;
370 sub was_deadlock_error
{
372 my $dbh = $self->dbh;
376 sub was_duplicate_error
{
378 my $dbh = $self->dbh;
382 # run a subref (presumably a database update) in an eval, because you expect it to
383 # maybe fail on duplicate key error, and throw a dup exception for you, else return
386 my ($self, $code) = @_;
387 my $rv = eval { $code->(); };
388 throw
("dup") if $self->was_duplicate_error;
393 # insert row if doesn't already exist
394 # WARNING: This function is NOT transaction safe if the duplicate errors causes
395 # your transaction to halt!
396 # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
397 # is false! Rows before the duplicate will be inserted, but rows after the
398 # duplicate might not be, depending your database.
400 my ($self, $sql, @params) = @_;
401 my $dbh = $self->dbh;
402 if ($self->can_insertignore) {
403 return $dbh->do("INSERT IGNORE $sql", @params);
405 # TODO: Detect bad multi-row insert here.
406 my $rv = eval { $dbh->do("INSERT $sql", @params); };
407 if ($@
|| $dbh->err) {
408 return 1 if $self->was_duplicate_error;
409 # This chunk is identical to condthrow, but we include it directly
410 # here as we know there is definitely an error, and we would like
411 # the caller of this function.
412 my ($pkg, $fn, $line) = caller;
413 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
420 sub retry_on_deadlock
{
423 my $tries = shift || 3;
424 croak
("deadlock retries must be positive") if $tries < 1;
427 while ($tries-- > 0) {
428 $rv = eval { $code->(); };
429 next if ($self->was_deadlock_error);
436 # --------------------------------------------------------------------------
440 sub add_extra_tables
{
442 push @extra_tables, @_;
445 use constant TABLES
=> qw( domain class file tempfile file_to_delete
446 unreachable_fids file_on file_on_corrupt host
447 device server_settings file_to_replicate
448 file_to_delete_later fsck_log file_to_queue
454 my $curver = $sto->schema_version;
456 my $latestver = SCHEMA_VERSION
;
457 if ($curver == $latestver) {
458 $sto->status("Schema already up-to-date at version $curver.");
462 if ($curver > $latestver) {
463 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
467 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
470 foreach my $t (TABLES
, @extra_tables) {
471 $sto->create_table($t);
474 $sto->upgrade_add_host_getport;
475 $sto->upgrade_add_host_altip;
476 $sto->upgrade_add_device_asof;
477 $sto->upgrade_add_device_weight;
478 $sto->upgrade_add_device_readonly;
479 $sto->upgrade_add_device_drain;
480 $sto->upgrade_add_class_replpolicy;
481 $sto->upgrade_modify_server_settings_value;
482 $sto->upgrade_add_file_to_queue_arg;
483 $sto->upgrade_modify_device_size;
488 sub cached_schema_version
{
490 return $self->{_cached_schema_version
} ||=
491 $self->schema_version;
496 my $dbh = $self->dbh;
498 $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
502 sub filter_create_sql
{ my ($self, $sql) = @_; return $sql; }
505 my ($self, $table) = @_;
506 my $dbh = $self->dbh;
507 return 1 if $self->table_exists($table);
508 my $meth = "TABLE_$table";
509 my $sql = $self->$meth;
510 $sql = $self->filter_create_sql($sql);
511 $self->status("Running SQL: $sql;");
513 die "Failed to create table $table: " . $dbh->errstr;
514 my $imeth = "INDEXES_$table";
515 my @indexes = eval { $self->$imeth };
516 foreach $sql (@indexes) {
517 $self->status("Running SQL: $sql;");
519 die "Failed to create indexes on $table: " . $dbh->errstr;
523 # Please try to keep all tables aligned nicely
524 # with '"CREATE TABLE' on the first line
525 # and ')"' alone on the last line.
528 # classes are tied to domains. domains can have classes of items
529 # with different mindevcounts.
531 # a minimum devcount is the number of copies the system tries to
532 # maintain for files in that class
534 # unspecified classname means classid=0 (implicit class), and that
535 # implies mindevcount=2
536 "CREATE TABLE domain (
537 dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
538 namespace VARCHAR(255),
544 "CREATE TABLE class (
545 dmid SMALLINT UNSIGNED NOT NULL,
546 classid TINYINT UNSIGNED NOT NULL,
547 PRIMARY KEY (dmid,classid),
548 classname VARCHAR(50),
549 UNIQUE (dmid,classname),
550 mindevcount TINYINT UNSIGNED NOT NULL
554 # the length field is only here for easy verifications of content
555 # integrity when copying around. no sums or content types or other
556 # metadata here. application can handle that.
558 # classid is what class of file this belongs to. for instance, on fotobilder
559 # there will be a class for original pictures (the ones the user uploaded)
560 # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
561 # each domain can setup classes and assign the minimum redundancy level for
562 # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
563 # photos and and a 1 minimum for derived images (which means the sole device
564 # for a derived image can die, bringing devcount to 0 for that file, but
565 # the application can recreate it from its original)
568 fid INT UNSIGNED NOT NULL,
571 dmid SMALLINT UNSIGNED NOT NULL,
572 dkey VARCHAR(255), # domain-defined
573 UNIQUE dkey (dmid, dkey),
575 length BIGINT UNSIGNED, # big limit
577 classid TINYINT UNSIGNED NOT NULL,
578 devcount TINYINT UNSIGNED NOT NULL,
579 INDEX devcount (dmid,classid,devcount)
584 "CREATE TABLE tempfile (
585 fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
588 createtime INT UNSIGNED NOT NULL,
589 classid TINYINT UNSIGNED NOT NULL,
590 dmid SMALLINT UNSIGNED NOT NULL,
596 # files marked for death when their key is overwritten. then they get a new
597 # fid, but since the old row (with the old fid) had to be deleted immediately,
598 # we need a place to store the fid so an async job can delete the file from
600 sub TABLE_file_to_delete
{
601 "CREATE TABLE file_to_delete (
602 fid INT UNSIGNED NOT NULL,
607 # if the replicator notices that a fid has no sources, that file gets inserted
608 # into the unreachable_fids table. it is up to the application to actually
609 # handle fids stored in this table.
610 sub TABLE_unreachable_fids
{
611 "CREATE TABLE unreachable_fids (
612 fid INT UNSIGNED NOT NULL,
613 lastupdate INT UNSIGNED NOT NULL,
619 # what files are on what devices? (most likely physical devices,
620 # as logical devices of RAID arrays would be costly, and mogilefs
621 # already handles redundancy)
623 # the devid index lets us answer "What files were on this now-dead disk?"
625 "CREATE TABLE file_on (
626 fid INT UNSIGNED NOT NULL,
627 devid MEDIUMINT UNSIGNED NOT NULL,
628 PRIMARY KEY (fid, devid),
633 # if application or framework detects an error in one of the duplicate files
634 # for whatever reason, it can register its complaint and the framework
635 # will do some verifications and fix things up w/ an async job
636 # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
637 # on the other devices?
638 sub TABLE_file_on_corrupt
{
639 "CREATE TABLE file_on_corrupt (
640 fid INT UNSIGNED NOT NULL,
641 devid MEDIUMINT UNSIGNED NOT NULL,
642 PRIMARY KEY (fid, devid)
646 # hosts (which contain devices...)
649 hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
651 status ENUM('alive','dead','down'),
652 http_port MEDIUMINT UNSIGNED DEFAULT 7500,
653 http_get_port MEDIUMINT UNSIGNED,
655 hostname VARCHAR(40),
667 "CREATE TABLE device (
668 devid MEDIUMINT UNSIGNED NOT NULL,
669 hostid MEDIUMINT UNSIGNED NOT NULL,
671 status ENUM('alive','dead','down'),
672 weight MEDIUMINT DEFAULT 100,
674 mb_total INT UNSIGNED,
675 mb_used INT UNSIGNED,
676 mb_asof INT UNSIGNED,
682 sub TABLE_server_settings
{
683 "CREATE TABLE server_settings (
684 field VARCHAR(50) PRIMARY KEY,
689 sub TABLE_file_to_replicate
{
690 # nexttry is time to try to replicate it next.
691 # 0 means immediate. it's only on one host.
692 # 1 means lower priority. it's on 2+ but isn't happy where it's at.
693 # unix timestamp means at/after that time. some previous error occurred.
694 # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
695 # failcount. how many times we've failed, just for doing backoff of nexttry.
696 # flags. reserved for future use.
697 "CREATE TABLE file_to_replicate (
698 fid INT UNSIGNED NOT NULL PRIMARY KEY,
699 nexttry INT UNSIGNED NOT NULL,
701 fromdevid INT UNSIGNED,
702 failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
703 flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
707 sub TABLE_file_to_delete_later
{
708 "CREATE TABLE file_to_delete_later (
709 fid INT UNSIGNED NOT NULL PRIMARY KEY,
710 delafter INT UNSIGNED NOT NULL,
716 "CREATE TABLE fsck_log (
717 logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
719 utime INT UNSIGNED NOT NULL,
720 fid INT UNSIGNED NULL,
722 devid MEDIUMINT UNSIGNED,
727 # generic queue table, designed to be used for workers/jobs which aren't
728 # constantly in use, and are async to the user.
729 # ie; fsck, drain, rebalance.
730 sub TABLE_file_to_queue
{
731 "CREATE TABLE file_to_queue (
732 fid INT UNSIGNED NOT NULL,
734 type TINYINT UNSIGNED NOT NULL,
735 nexttry INT UNSIGNED NOT NULL,
736 failcount TINYINT UNSIGNED NOT NULL default '0',
737 flags SMALLINT UNSIGNED NOT NULL default '0',
739 PRIMARY KEY (fid, type),
740 INDEX type_nexttry (type,nexttry)
744 # new style async delete table.
745 # this is separate from file_to_queue since deletes are more actively used,
746 # and partitioning on 'type' doesn't always work so well.
747 sub TABLE_file_to_delete2
{
748 "CREATE TABLE file_to_delete2 (
749 fid INT UNSIGNED NOT NULL PRIMARY KEY,
750 nexttry INT UNSIGNED NOT NULL,
751 failcount TINYINT UNSIGNED NOT NULL default '0',
752 INDEX nexttry (nexttry)
756 # these five only necessary for MySQL, since no other database existed
757 # before, so they can just create the tables correctly to begin with.
758 # in the future, there might be new alters that non-MySQL databases
759 # will have to implement.
760 sub upgrade_add_host_getport
{ 1 }
761 sub upgrade_add_host_altip
{ 1 }
762 sub upgrade_add_device_asof
{ 1 }
763 sub upgrade_add_device_weight
{ 1 }
764 sub upgrade_add_device_readonly
{ 1 }
765 sub upgrade_add_device_drain
{ die "Not implemented in $_[0]" }
766 sub upgrade_modify_server_settings_value
{ die "Not implemented in $_[0]" }
767 sub upgrade_add_file_to_queue_arg
{ die "Not implemented in $_[0]" }
768 sub upgrade_modify_device_size
{ die "Not implemented in $_[0]" }
770 sub upgrade_add_class_replpolicy
{
772 unless ($self->column_type("class", "replpolicy")) {
773 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
777 # return true if deleted, 0 if didn't exist, exception if error
779 my ($self, $hostid) = @_;
780 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
783 # return true if deleted, 0 if didn't exist, exception if error
785 my ($self, $dmid) = @_;
786 throw
("has_files") if $self->domain_has_files($dmid);
787 throw
("has_classes") if $self->domain_has_classes($dmid);
788 return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
791 sub domain_has_files
{
792 my ($self, $dmid) = @_;
793 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
795 return $has_a_fid ?
1 : 0;
798 sub domain_has_classes
{
799 my ($self, $dmid) = @_;
800 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? LIMIT 1',
802 return $has_a_class ?
1 : 0;
805 sub class_has_files
{
806 my ($self, $dmid, $clid) = @_;
807 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
808 undef, $dmid, $clid);
809 return $has_a_fid ?
1 : 0;
812 # return new classid on success (non-zero integer), die on failure
813 # throw 'dup' on duplicate name
814 # override this if you want a less racy version.
816 my ($self, $dmid, $classname) = @_;
817 my $dbh = $self->dbh;
819 # get the max class id in this domain
820 my $maxid = $dbh->selectrow_array
821 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
823 my $clsid = $maxid + 1;
824 if ($classname eq 'default') {
828 # now insert the new class
830 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
831 undef, $dmid, $clsid, $classname, 2);
833 if ($@
|| $dbh->err) {
834 if ($self->was_duplicate_error) {
838 return $clsid if $rv;
843 # return 1 on success, throw "dup" on duplicate name error, die otherwise
844 sub update_class_name
{
846 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
848 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
849 undef, $arg{classname
}, $arg{dmid
}, $arg{classid
});
851 throw
("dup") if $self->was_duplicate_error;
856 # return 1 on success, die otherwise
857 sub update_class_mindevcount
{
859 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
861 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
862 undef, $arg{mindevcount
}, $arg{dmid
}, $arg{classid
});
868 # return 1 on success, die otherwise
869 sub update_class_replpolicy
{
871 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
873 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
874 undef, $arg{replpolicy
}, $arg{dmid
}, $arg{classid
});
880 sub nfiles_with_dmid_classid_devcount
{
881 my ($self, $dmid, $classid, $devcount) = @_;
882 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
883 undef, $dmid, $classid, $devcount);
886 sub set_server_setting
{
887 my ($self, $key, $val) = @_;
888 my $dbh = $self->dbh;
889 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
893 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
895 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
899 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
903 # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
904 sub incr_server_setting
{
905 my ($self, $key, $val) = @_;
906 $val = 1 unless defined $val;
909 return 1 if $self->dbh->do("UPDATE server_settings ".
910 "SET value=value+? ".
911 "WHERE field=?", undef,
913 $self->set_server_setting($key, $val);
917 my ($self, $key) = @_;
918 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
922 sub server_settings
{
925 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
927 while (my ($k, $v) = $sth->fetchrow_array) {
933 # register a tempfile and return the fidid, which should be allocated
934 # using autoincrement/sequences if the passed in fid is undef. however,
935 # if fid is passed in, that value should be used and returned.
937 # return new/passed in fidid on success.
938 # throw 'dup' if fid already in use
939 # return 0/undef/die on failure
941 sub register_tempfile
{
943 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
945 my $dbh = $self->dbh;
948 my $explicit_fid_used = $fid ?
1 : 0;
950 # setup the new mapping. we store the devices that we picked for
951 # this file in here, knowing that they might not be used. create_close
952 # is responsible for actually mapping in file_on. NOTE: fid is being
953 # passed in, it's either some number they gave us, or it's going to be
954 # 0/undef which translates into NULL which means to automatically create
955 # one. that should be fine.
956 my $ins_tempfile = sub {
958 # We must only pass the correct number of bind parameters
959 # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
960 # Postgres, where you are expected to leave it out or use DEFAULT
961 # Leaving it out seems sanest and least likely to cause problems
962 # with other databases.
963 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
964 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
965 my @vals = ($arg{dmid
}, $arg{key
}, $arg{classid
} || 0, $arg{devids
});
966 # Do not check for $explicit_fid_used, but rather $fid directly
967 # as this anonymous sub is called from the loop later
969 unshift @keys, 'fid';
973 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
974 $dbh->do($sql, undef, @vals);
977 return undef if $self->was_duplicate_error;
978 die "Unexpected db error into tempfile: " . $dbh->errstr;
981 unless (defined $fid) {
982 # if they did not give us a fid, then we want to grab the one that was
983 # theoretically automatically generated
984 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
985 or die "No last_insert_id found";
987 return undef unless defined $fid && $fid > 0;
991 unless ($ins_tempfile->()) {
992 throw
("dup") if $explicit_fid_used;
993 die "tempfile insert failed";
996 my $fid_in_use = sub {
997 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
998 return $exists ?
1 : 0;
1001 # See notes in MogileFS::Config->check_database
1002 my $min_fidid = MogileFS
::Config
->config('min_fidid');
1004 # if the fid is in use, do something
1005 while ($fid_in_use->($fid) || $fid <= $min_fidid) {
1006 throw
("dup") if $explicit_fid_used;
1008 # be careful of databases which reset their
1009 # auto-increment/sequences when the table is empty (InnoDB
1010 # did/does this, for instance). So check if it's in use, and
1011 # re-seed the table with the highest known fid from the file
1014 # get the highest fid from the filetable and insert a dummy row
1015 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
1016 $ins_tempfile->(); # don't care about its result
1018 # then do a normal auto-increment
1020 $ins_tempfile->() or die "register_tempfile failed after seeding";
1026 # return hashref of row containing columns "fid, dmid, dkey, length,
1027 # classid, devcount" provided a $dmid and $key (dkey). or undef if no
1029 sub file_row_from_dmid_key
{
1030 my ($self, $dmid, $key) = @_;
1031 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1032 "FROM file WHERE dmid=? AND dkey=?",
1033 undef, $dmid, $key);
1036 # return hashref of row containing columns "fid, dmid, dkey, length,
1037 # classid, devcount" provided a $fidid or undef if no row.
1038 sub file_row_from_fidid
{
1039 my ($self, $fidid) = @_;
1040 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1041 "FROM file WHERE fid=?",
1045 # return an arrayref of rows containing columns "fid, dmid, dkey, length,
1046 # classid, devcount" provided a pair of $fidid or undef if no rows.
1047 sub file_row_from_fidid_range
{
1048 my ($self, $fromfid, $count) = @_;
1049 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1050 "FROM file WHERE fid > ? LIMIT ?");
1051 $sth->execute($fromfid,$count);
1052 return $sth->fetchall_arrayref({});
1055 # return array of devids that a fidid is on
1057 my ($self, $fidid) = @_;
1058 return @
{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
1059 undef, $fidid) || [] };
1062 # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1063 sub fid_devids_multiple
{
1064 my ($self, @fidids) = @_;
1065 my $in = join(",", map { $_+0 } @fidids);
1067 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1069 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1070 push @
{$ret->{$fidid} ||= []}, $devid;
1075 # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1076 sub tempfile_row_from_fid
{
1077 my ($self, $fidid) = @_;
1078 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1079 "FROM tempfile WHERE fid=?",
1083 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1085 my ($self, $devid, $hostid, $status) = @_;
1086 my $rv = $self->conddup(sub {
1087 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1088 $devid, $hostid, $status);
1091 die "error making device $devid\n" unless $rv > 0;
1096 my ($self, $devid, $to_update) = @_;
1097 my @keys = sort keys %$to_update;
1098 return unless @keys;
1099 $self->conddup(sub {
1100 $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1101 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
1107 sub update_device_usage
{
1109 my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
1111 $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
1112 " WHERE devid = ?", undef, $arg{mb_total
}, $arg{mb_used
}, $arg{devid
});
1117 # This is unimplemented at the moment as we must verify:
1118 # - no file_on rows exist
1119 # - nothing in file_to_queue is going to attempt to use it
1120 # - nothing in file_to_replicate is going to attempt to use it
1121 # - it's already been marked dead
1122 # - that all trackers are likely to know this :/
1123 # - ensure the devid can't be reused
1124 # IE; the user can't mark it dead then remove it all at once and cause their
1125 # cluster to implode.
1127 die "Unimplemented; needs further testing";
1130 sub mark_fidid_unreachable
{
1131 my ($self, $fidid) = @_;
1132 die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
1133 $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
1137 sub set_device_weight
{
1138 my ($self, $devid, $weight) = @_;
1140 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1145 sub set_device_state
{
1146 my ($self, $devid, $state) = @_;
1148 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1154 my ($self, $dmid, $cid) = @_;
1155 throw
("has_files") if $self->class_has_files($dmid, $cid);
1157 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1163 my ($self, $fidid) = @_;
1164 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
1166 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1168 $self->enqueue_for_delete2($fidid, 0);
1172 sub delete_tempfile_row
{
1173 my ($self, $fidid) = @_;
1174 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1179 # Load the specified tempfile, then delete it. If we succeed, we were
1180 # here first; otherwise, someone else beat us here (and we return undef)
1181 sub delete_and_return_tempfile_row
{
1182 my ($self, $fidid) = @_;
1183 my $rv = $self->tempfile_row_from_fid($fidid);
1184 my $rows_deleted = $self->delete_tempfile_row($fidid);
1185 return $rv if ($rows_deleted > 0);
1188 sub replace_into_file
{
1190 my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
1191 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1193 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1194 "VALUES (?,?,?,?,?,0) ", undef,
1195 @arg{'fidid', 'dmid', 'key', 'length', 'classid'});
1200 # returns 1 on success, 0 on duplicate key error, dies on exception
1201 # TODO: need a test to hit the duplicate name error condition
1202 # TODO: switch to using "dup" exception here?
1204 my ($self, $fidid, $to_key) = @_;
1205 my $dbh = $self->dbh;
1207 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1208 undef, $to_key, $fidid);
1210 if ($@
|| $dbh->err) {
1211 # first is MySQL's error code for duplicates
1212 if ($self->was_duplicate_error) {
1222 sub get_domainid_by_name
{
1224 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1229 # returns a hash of domains. Key is namespace, value is dmid.
1230 sub get_all_domains
{
1232 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1233 return map { ($_->[0], $_->[1]) } @
{$domains || []};
1236 sub get_classid_by_name
{
1238 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1239 undef, $_[0], $_[1]);
1243 # returns an array of hashrefs, one hashref per row in the 'class' table
1244 sub get_all_classes
{
1249 if ($self->cached_schema_version >= 10) {
1250 $repl_col = ", replpolicy";
1253 my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount $repl_col FROM class");
1255 push @ret, $row while $row = $sth->fetchrow_hashref;
1259 # add a record of fidid existing on devid
1260 # returns 1 on success, 0 on duplicate
1261 sub add_fidid_to_devid
{
1262 my ($self, $fidid, $devid) = @_;
1263 croak
("fidid not non-zero") unless $fidid;
1264 croak
("devid not non-zero") unless $devid;
1266 # TODO: This should possibly be insert_ignore instead
1267 # As if we are adding an extra file_on entry, we do not want to replace the
1268 # exist one. Check REPLACE semantics.
1269 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1270 undef, $fidid, $devid);
1271 return 1 if $rv > 0;
1275 # remove a record of fidid existing on devid
1276 # returns 1 on success, 0 if not there anyway
1277 sub remove_fidid_from_devid
{
1278 my ($self, $fidid, $devid) = @_;
1279 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1280 undef, $fidid, $devid); };
1285 # Test if host exists.
1286 sub get_hostid_by_id
{
1288 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1293 sub get_hostid_by_name
{
1295 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1300 # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1303 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1304 "hostip, http_port, http_get_port, altip, altmask FROM host");
1307 while (my $row = $sth->fetchrow_hashref) {
1313 # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1314 sub get_all_devices
{
1316 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1317 "mb_used, mb_asof, status, weight FROM device");
1321 while (my $row = $sth->fetchrow_hashref) {
1327 # update the device count for a given fidid
1328 sub update_devcount
{
1329 my ($self, $fidid) = @_;
1330 my $dbh = $self->dbh;
1331 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1334 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1341 # update the classid for a given fidid
1342 sub update_classid
{
1343 my ($self, $fidid, $classid) = @_;
1344 my $dbh = $self->dbh;
1346 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1353 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1354 sub enqueue_for_replication
{
1355 my ($self, $fidid, $from_devid, $in) = @_;
1359 $nexttry = $self->unix_timestamp . " + " . int($in);
1362 $self->retry_on_deadlock(sub {
1363 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1364 "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1368 # enqueue a fidid for delete
1369 # note: if we get one more "independent" queue like this, the
1370 # code should be collapsable? I tried once and it looked too ugly, so we have
1372 sub enqueue_for_delete2
{
1373 my ($self, $fidid, $in) = @_;
1376 my $nexttry = $self->unix_timestamp . " + " . int($in);
1378 $self->retry_on_deadlock(sub {
1379 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1380 "VALUES (?,$nexttry)", undef, $fidid);
1384 # enqueue a fidid for work
1385 sub enqueue_for_todo
{
1386 my ($self, $fidid, $type, $in) = @_;
1389 my $nexttry = $self->unix_timestamp . " + " . int($in);
1391 $self->retry_on_deadlock(sub {
1393 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1394 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1395 $fidid->[0], $fidid->[1], $fidid->[2], $type);
1397 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1398 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1403 # return 1 on success. die otherwise.
1404 sub enqueue_many_for_todo
{
1405 my ($self, $fidids, $type, $in) = @_;
1406 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1407 $self->enqueue_for_todo($_, $type, $in) foreach @
$fidids;
1412 my $nexttry = $self->unix_timestamp . " + " . int($in);
1414 # TODO: convert to prepared statement?
1415 $self->retry_on_deadlock(sub {
1416 if (ref($fidids->[0]) eq 'ARRAY') {
1417 my $sql = $self->ignore_replace .
1418 "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1419 join(', ', ('(?,?,?,?,?)') x
scalar @
$fidids);
1420 $self->dbh->do($sql, undef, map { @
$_, $type, $nexttry } @
$fidids);
1422 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1424 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @
$fidids));
1430 # For file_to_queue queues that should be kept small, find the size.
1431 # This isn't fast, but for small queues won't be slow, and is usually only ran
1432 # from a single tracker.
1433 sub file_queue_length
{
1437 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1438 "WHERE type = ?", undef, $type);
1441 # reschedule all deferred replication, return number rescheduled
1445 $self->retry_on_deadlock(sub {
1446 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1447 " WHERE nexttry > " . $self->unix_timestamp);
1451 # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1452 sub get_fidids_by_device
{
1453 my ($self, $devid, $limit) = @_;
1455 my $dbh = $self->dbh;
1456 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1461 # finds a chunk of fids given a set of constraints:
1462 # devid, fidid, age (new or old), limit
1463 # Note that if this function is very slow on your large DB, you're likely
1464 # sorting by "newfiles" and are missing a new index.
1465 # returns an arrayref of fidids
1466 sub get_fidid_chunks_by_device
{
1467 my ($self, %o) = @_;
1469 my $dbh = $self->dbh;
1470 my $devid = delete $o{devid
};
1471 croak
("must supply at least a devid") unless $devid;
1472 my $age = delete $o{age
};
1473 my $fidid = delete $o{fidid
};
1474 my $limit = delete $o{limit
};
1475 croak
("invalid options: " . join(', ', keys %o)) if %o;
1476 # If supplied a "previous" fidid, we're paging through.
1480 if ($age eq 'old') {
1481 $fidsort = 'AND fid > ?' if $fidid;
1483 } elsif ($age eq 'new') {
1484 $fidsort = 'AND fid < ?' if $fidid;
1487 croak
("invalid age argument: " . $age);
1491 push @extra, $fidid if $fidid;
1493 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1494 $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1498 # takes two arguments, fidid to be above, and optional limit (default
1499 # 1,000). returns up to that that many fidids above the provided
1500 # fidid. returns array of MogileFS::FID objects, sorted by fid ids.
1501 sub get_fids_above_id
{
1502 my ($self, $fidid, $limit) = @_;
1504 $limit = int($limit);
1507 my $dbh = $self->dbh;
1508 my $sth = $dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1511 "ORDER BY fid LIMIT $limit");
1512 $sth->execute($fidid);
1513 while (my $row = $sth->fetchrow_hashref) {
1514 push @ret, MogileFS
::FID
->new_from_db_row($row);
1519 # Same as above, but returns unblessed hashref.
1520 sub get_fidids_above_id
{
1521 my ($self, $fidid, $limit) = @_;
1523 $limit = int($limit);
1525 my $dbh = $self->dbh;
1526 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file WHERE fid
> ?
1527 ORDER BY fid LIMIT
$limit}, undef, $fidid);
1531 # creates a new domain, given a domain namespace string. return the dmid on success,
1532 # throw 'dup' on duplicate name.
1533 # override if you want a less racy version.
1535 my ($self, $name) = @_;
1536 my $dbh = $self->dbh;
1538 # get the max domain id
1539 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1541 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1542 undef, $maxid + 1, $name);
1544 if ($self->was_duplicate_error) {
1547 return $maxid+1 if $rv;
1548 die "failed to make domain"; # FIXME: the above is racy.
1552 my ($self, $hid, $to_update) = @_;
1553 my @keys = sort keys %$to_update;
1554 return unless @keys;
1555 $self->conddup(sub {
1556 $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1557 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
1563 sub update_host_property
{
1564 my ($self, $hostid, $col, $val) = @_;
1565 $self->conddup(sub {
1566 $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
1571 # return ne hostid, or throw 'dup' on error.
1572 # NOTE: you need to put them into the initial 'down' state.
1574 my ($self, $hostname, $ip) = @_;
1575 my $dbh = $self->dbh;
1576 # racy! lazy. no, better: portable! how often does this happen? :)
1577 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1578 my $rv = $self->conddup(sub {
1579 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1580 "VALUES (?, ?, ?, 'down')",
1581 undef, $hid, $hostname, $ip);
1587 # return array of row hashrefs containing columns: (fid, fromdevid,
1588 # failcount, flags, nexttry)
1589 sub files_to_replicate
{
1590 my ($self, $limit) = @_;
1591 my $ut = $self->unix_timestamp;
1592 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1593 SELECT fid
, fromdevid
, failcount
, flags
, nexttry
1594 FROM file_to_replicate
1595 WHERE nexttry
<= $ut
1598 }, "fid") or return ();
1599 return values %$to_repl_map;
1602 # "new" style queue consumption code.
1603 # from within a transaction, fetch a limit of fids,
1604 # then update each fid's nexttry to be off in the future,
1605 # giving local workers some time to dequeue the items.
1607 # DBI (even with RaiseError) returns weird errors on
1608 # deadlocks from selectall_hashref. So we can't do that.
1609 # we also used to retry on deadlock within the routine,
1610 # but instead lets return undef and let job_master retry.
1611 sub grab_queue_chunk
{
1615 my $extfields = shift;
1617 my $dbh = $self->dbh;
1621 return 0 unless $self->lock_queue($queue);
1623 my $extwhere = shift || '';
1624 my $fields = 'fid, nexttry, failcount';
1625 $fields .= ', ' . $extfields if $extfields;
1628 my $ut = $self->unix_timestamp;
1632 WHERE nexttry
<= $ut
1637 $query .= "FOR UPDATE\n" if $self->can_for_update;
1638 my $sth = $dbh->prepare($query);
1640 $work = $sth->fetchall_hashref('fid');
1641 # Nothing to work on.
1642 # Now claim the fids for a while.
1643 # TODO: Should be configurable... but not necessary.
1644 my $fidlist = join(',', keys %$work);
1645 unless ($fidlist) { $dbh->commit; return; }
1646 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1649 $self->unlock_queue($queue);
1650 if ($self->was_deadlock_error) {
1651 eval { $dbh->rollback };
1656 return defined $work ?
values %$work : ();
1659 sub grab_files_to_replicate
{
1660 my ($self, $limit) = @_;
1661 return $self->grab_queue_chunk('file_to_replicate', $limit,
1662 'fromdevid, flags');
1665 sub grab_files_to_delete2
{
1666 my ($self, $limit) = @_;
1667 return $self->grab_queue_chunk('file_to_delete2', $limit);
1670 # $extwhere is ugly... but should be fine.
1671 sub grab_files_to_queued
{
1672 my ($self, $type, $what, $limit) = @_;
1673 $what ||= 'type, flags';
1674 return $self->grab_queue_chunk('file_to_queue', $limit,
1675 $what, 'AND type = ' . $type);
1678 # although it's safe to have multiple tracker hosts and/or processes
1679 # replicating the same file, around, it's inefficient CPU/time-wise,
1680 # and it's also possible they pick different places and waste disk.
1681 # so the replicator asks the store interface when it's about to start
1682 # and when it's done replicating a fidid, so you can do something smart
1683 # and tell it not to.
1684 sub should_begin_replicating_fidid
{
1685 my ($self, $fidid) = @_;
1686 warn("Inefficient implementation of should_begin_replicating_fidid() in $self!\n");
1690 # called when replicator is done replicating a fid, so you can cleanup
1691 # whatever you did in 'should_begin_replicating_fidid' above.
1693 # NOTE: there's a theoretical race condition in the rebalance code,
1694 # where (without locking as provided by
1695 # should_begin_replicating_fidid/note_done_replicating), all copies of
1696 # a file can be deleted by independent replicators doing rebalancing
1697 # in different ways. so you'll probably want to implement some
1698 # locking in this pair of functions.
1699 sub note_done_replicating
{
1700 my ($self, $fidid) = @_;
1703 sub find_fid_from_file_to_replicate
{
1704 my ($self, $fidid) = @_;
1705 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1709 sub find_fid_from_file_to_delete2
{
1710 my ($self, $fidid) = @_;
1711 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1715 sub find_fid_from_file_to_queue
{
1716 my ($self, $fidid, $type) = @_;
1717 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1718 undef, $fidid, $type);
1721 sub delete_fid_from_file_to_replicate
{
1722 my ($self, $fidid) = @_;
1723 $self->retry_on_deadlock(sub {
1724 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1728 sub delete_fid_from_file_to_queue
{
1729 my ($self, $fidid, $type) = @_;
1730 $self->retry_on_deadlock(sub {
1731 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1732 undef, $fidid, $type);
1736 sub delete_fid_from_file_to_delete2
{
1737 my ($self, $fidid) = @_;
1738 $self->retry_on_deadlock(sub {
1739 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1743 sub reschedule_file_to_replicate_absolute
{
1744 my ($self, $fid, $abstime) = @_;
1745 $self->retry_on_deadlock(sub {
1746 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1747 undef, $abstime, $fid);
1751 sub reschedule_file_to_replicate_relative
{
1752 my ($self, $fid, $in_n_secs) = @_;
1753 $self->retry_on_deadlock(sub {
1754 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1755 "failcount = failcount + 1 WHERE fid = ?",
1756 undef, $in_n_secs, $fid);
1760 sub reschedule_file_to_delete2_absolute
{
1761 my ($self, $fid, $abstime) = @_;
1762 $self->retry_on_deadlock(sub {
1763 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1764 undef, $abstime, $fid);
1768 sub reschedule_file_to_delete2_relative
{
1769 my ($self, $fid, $in_n_secs) = @_;
1770 $self->retry_on_deadlock(sub {
1771 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1772 "failcount = failcount + 1 WHERE fid = ?",
1773 undef, $in_n_secs, $fid);
1777 # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1780 my ($self, $dmid, $prefix, $after, $limit) = @_;
1781 # fix the input... prefix always ends with a % so that it works
1782 # in a LIKE call, and after is either blank or something
1783 $prefix = '' unless defined $prefix;
1785 $after = '' unless defined $after;
1787 # now select out our keys
1788 return $self->dbh->selectcol_arrayref
1789 ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
1790 "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
1793 # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1794 # that were created $secs_ago seconds ago or older.
1796 my ($self, $secs_old) = @_;
1797 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1798 "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1801 # given an array of MogileFS::DevFID objects, mass-insert them all
1802 # into file_on (ignoring if they're already present)
1803 sub mass_insert_file_on
{
1804 my ($self, @devfids) = @_;
1805 return 1 unless @devfids;
1807 if (@devfids > 1 && ! $self->can_insert_multi) {
1808 $self->mass_insert_file_on($_) foreach @devfids;
1812 my (@qmarks, @binds);
1813 foreach my $df (@devfids) {
1814 my ($fidid, $devid) = ($df->fidid, $df->devid);
1815 Carp
::croak
("got a false fidid") unless $fidid;
1816 Carp
::croak
("got a false devid") unless $devid;
1817 push @binds, $fidid, $devid;
1818 push @qmarks, "(?,?)";
1821 # TODO: This should possibly be insert_ignore instead
1822 # As if we are adding an extra file_on entry, we do not want to replace the
1823 # exist one. Check REPLACE semantics.
1824 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1828 sub set_schema_vesion
{
1829 my ($self, $ver) = @_;
1830 $self->set_server_setting("schema_version", int($ver));
1833 # returns array of fidids to try and delete again
1834 sub fids_to_delete_again
{
1836 my $ut = $self->unix_timestamp;
1837 return @
{ $self->dbh->selectcol_arrayref(qq{
1839 FROM file_to_delete_later
1840 WHERE delafter
< $ut
1845 # return 1 on success. die otherwise.
1846 sub enqueue_fids_to_delete
{
1847 my ($self, @fidids) = @_;
1848 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1849 # when the first row causes the duplicate error, and the remaining rows are
1851 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1852 $self->enqueue_fids_to_delete($_) foreach @fidids;
1855 # TODO: convert to prepared statement?
1856 $self->retry_on_deadlock(sub {
1857 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1858 join(",", map { "(" . int($_) . ")" } @fidids));
1863 sub enqueue_fids_to_delete2
{
1864 my ($self, @fidids) = @_;
1865 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1866 # when the first row causes the duplicate error, and the remaining rows are
1868 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1869 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1873 my $nexttry = $self->unix_timestamp;
1875 # TODO: convert to prepared statement?
1876 $self->retry_on_deadlock(sub {
1877 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
1879 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
1884 # clears everything from the fsck_log table
1885 # return 1 on success. die otherwise.
1886 sub clear_fsck_log
{
1888 $self->dbh->do("DELETE FROM fsck_log");
1892 # FIXME: Fsck log entries are processed a little out of order.
1893 # Once a fsck has completed, the log should be re-summarized.
1894 sub fsck_log_summarize
{
1897 my $lockname = 'mgfs:fscksum';
1898 my $lock = eval { $self->get_lock($lockname, 10) };
1899 return 0 if defined $lock && $lock == 0;
1901 my $logid = $self->max_fsck_logid;
1903 # sum-up evcode counts every so often, to make fsck_status faster,
1904 # avoiding a potentially-huge GROUP BY in the future..
1905 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
1907 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
1909 my $cts = $self->fsck_evcode_counts(logid_range
=> [$min_logid, $logid]); # inclusive notation :)
1910 while (my ($evcode, $ct) = each %$cts) {
1911 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
1913 $self->set_server_setting("fsck_logid_processed", $logid);
1915 $self->release_lock($lockname) if $lock;
1919 my ($self, %opts) = @_;
1920 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
1921 "VALUES (" . $self->unix_timestamp . ",?,?,?)",
1925 delete $opts{devid
});
1926 croak
("Unknown opts") if %opts;
1932 sub get_db_unixtime
{
1934 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
1939 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
1942 sub max_fsck_logid
{
1944 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
1947 # returns array of $row hashrefs, from fsck_log table
1949 my ($self, $after_logid, $limit) = @_;
1950 $limit = int($limit || 100);
1951 $after_logid = int($after_logid || 0);
1954 my $sth = $self->dbh->prepare(qq{
1955 SELECT logid
, utime, fid
, evcode
, devid
1961 $sth->execute($after_logid);
1963 push @rows, $row while $row = $sth->fetchrow_hashref;
1967 sub fsck_evcode_counts
{
1968 my ($self, %opts) = @_;
1969 my $timegte = delete $opts{time_gte
};
1970 my $logr = delete $opts{logid_range
};
1976 $sth = $self->dbh->prepare(qq{
1977 SELECT evcode
, COUNT
(*) FROM fsck_log
1981 $sth->execute($timegte||0);
1984 $sth = $self->dbh->prepare(qq{
1985 SELECT evcode
, COUNT
(*) FROM fsck_log
1986 WHERE logid
>= ? AND logid
<= ?
1989 $sth->execute($logr->[0], $logr->[1]);
1991 while (my ($ev, $ct) = $sth->fetchrow_array) {
1997 # run before daemonizing. you can die from here if you see something's amiss. or emit
1999 sub pre_daemonize_checks
{ }
2002 # attempt to grab a lock of lockname, and timeout after timeout seconds.
2003 # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
2005 my ($self, $lockname, $timeout) = @_;
2006 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth
};
2007 die "get_lock not implemented for $self";
2010 # attempt to release a lock of lockname.
2011 # returns 1 on success and 0 if no lock we have has that name.
2013 my ($self, $lockname) = @_;
2014 die "release_lock not implemented for $self";
2017 # MySQL has an issue where you either get excessive deadlocks, or INSERT's
2018 # hang forever around some transactions. Use ghetto locking to cope.
2019 sub lock_queue
{ 1 }
2020 sub unlock_queue
{ 1 }
2022 # returns up to $limit @fidids which are on provided $devid
2023 sub random_fids_on_device
{
2024 my ($self, $devid, $limit) = @_;
2025 $limit = int($limit) || 100;
2027 my $dbh = $self->dbh;
2029 # FIXME: this blows. not random. and good chances these will
2030 # eventually get to point where they're un-rebalance-able, and we
2031 # never move on past the first 5000
2032 my @some_fids = List
::Util
::shuffle
(@
{
2033 $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid=? LIMIT 5000",
2034 undef, $devid) || []
2037 @some_fids = @some_fids[0..$limit-1] if $limit < @some_fids;
2047 MogileFS::Store - data storage provider. base class.
2051 MogileFS aims to be database-independent (though currently as of late
2052 2006 only works with MySQL). In the future, the server will create a
2053 singleton instance of type "MogileFS::Store", like
2054 L<MogileFS::Store::MySQL>, and all database interaction will be
2059 L<MogileFS::Store::MySQL>