1 package MogileFS
::Store
;
4 use Carp
qw(croak confess);
5 use MogileFS
::Util
qw(throw max error);
6 use DBI
; # no reason a Store has to be DBI-based, but for now they all are.
7 use List
::Util
qw(shuffle);
9 # this is incremented whenever the schema changes. server will refuse
10 # to start-up with an old schema version
12 # 6: adds file_to_replicate table
13 # 7: adds file_to_delete_later table
14 # 8: adds fsck_log table
15 # 9: adds 'drain' state to enum in device table
16 # 10: adds 'replpolicy' column to 'class' table
17 # 11: adds 'file_to_queue' table
18 # 12: adds 'file_to_delete2' table
19 # 13: modifies 'server_settings.value' to TEXT for wider values
20 # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21 # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22 # 15: adds checksum table, adds 'hashtype' column to 'class' table
23 # 16: adds 'readonly' state to enum in host table
24 use constant SCHEMA_VERSION
=> 16;
28 return $class->new_from_dsn_user_pass(map { MogileFS
->config($_) } qw(db_dsn db_user db_pass max_handles));
31 sub new_from_dsn_user_pass
{
32 my ($class, $dsn, $user, $pass, $max_handles) = @_;
34 if ($dsn =~ /^DBI:mysql:/i) {
35 $subclass = "MogileFS::Store::MySQL";
36 } elsif ($dsn =~ /^DBI:SQLite:/i) {
37 $subclass = "MogileFS::Store::SQLite";
38 } elsif ($dsn =~ /^DBI:Oracle:/i) {
39 $subclass = "MogileFS::Store::Oracle";
40 } elsif ($dsn =~ /^DBI:Pg:/i) {
41 $subclass = "MogileFS::Store::Postgres";
43 die "Unknown database type: $dsn";
45 unless (eval "use $subclass; 1") {
46 die "Error loading $subclass: $@\n";
52 max_handles
=> $max_handles, # Max number of handles to allow
53 raise_errors
=> $subclass->want_raise_errors,
54 slave_list_version
=> 0,
55 slave_list_cache
=> [],
56 recheck_req_gen
=> 0, # incremented generation, of recheck of dbh being requested
57 recheck_done_gen
=> 0, # once recheck is done, copy of what the request generation was
58 handles_left
=> 0, # amount of times this handle can still be verified
59 connected_slaves
=> {},
61 dead_backoff
=> {}, # how many times in a row a slave has died
62 connect_timeout
=> 10, # High default.
68 # Defaults to true now.
69 sub want_raise_errors
{
73 sub new_from_mogdbsetup
{
74 my ($class, %args) = @_;
75 # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
76 my $dsn = $class->dsn_of_dbhost($args{dbname
}, $args{dbhost
}, $args{dbport
});
78 my $try_make_sto = sub {
79 my $dbh = DBI
->connect($dsn, $args{dbuser
}, $args{dbpass
}, {
82 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser
}, $args{dbpass
});
87 # upgrading, apparently, as this database already exists.
88 my $sto = $try_make_sto->();
91 # otherwise, we need to make the requested database, setup permissions, etc
92 $class->status("couldn't connect to database as mogilefs user. trying root...");
93 my $rootdsn = $class->dsn_of_root($args{dbname
}, $args{dbhost
}, $args{dbport
});
94 my $rdbh = DBI
->connect($rootdsn, $args{dbrootuser
}, $args{dbrootpass
}, {
97 die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI
->errstr . "\n";
98 $class->status("connected to database as root user.");
100 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
101 $class->create_db_if_not_exists($rdbh, $args{dbname
});
102 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
103 $class->grant_privileges($rdbh, $args{dbname
}, $args{dbuser
}, $args{dbpass
});
105 # should be ready now:
106 $sto = $try_make_sto->();
109 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
112 # given a root DBI connection, create the named database. succeed
113 # if it it's made, or already exists. die otherwise.
114 sub create_db_if_not_exists
{
115 my ($pkg, $rdbh, $dbname) = @_;
116 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
117 or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
120 sub grant_privileges
{
121 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
122 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
124 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
125 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
127 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
130 sub can_replace
{ 0 }
131 sub can_insertignore
{ 0 }
132 sub can_insert_multi
{ 0 }
133 sub can_for_update
{ 1 }
135 sub unix_timestamp
{ die "No function in $_[0] to return DB's unixtime." }
139 return "INSERT IGNORE " if $self->can_insertignore;
140 return "REPLACE " if $self->can_replace;
141 die "Can't INSERT IGNORE or REPLACE?";
144 my $on_status = sub {};
145 my $on_confirm = sub { 1 };
146 sub on_status
{ my ($pkg, $code) = @_; $on_status = $code; };
147 sub on_confirm
{ my ($pkg, $code) = @_; $on_confirm = $code; };
148 sub status
{ my ($pkg, $msg) = @_; $on_status->($msg); };
149 sub confirm
{ my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
151 sub latest_schema_version
{ SCHEMA_VERSION
}
155 $self->{raise_errors
} = 1;
156 $self->dbh->{RaiseError
} = 1;
159 sub set_connect_timeout
{ $_[0]{connect_timeout
} = $_[1]; }
161 sub dsn
{ $_[0]{dsn
} }
162 sub user
{ $_[0]{user
} }
163 sub pass
{ $_[0]{pass
} }
165 sub connect_timeout
{ $_[0]{connect_timeout
} }
168 sub post_dbi_connect
{ 1 }
170 sub can_do_slaves
{ 0 }
174 die "Incapable of becoming slave." unless $self->can_do_slaves;
176 $self->{is_slave
} = 1;
181 return $self->{is_slave
};
184 sub _slaves_list_changed
{
186 my $ver = MogileFS
::Config
->server_setting_cached('slave_version') || 0;
187 if ($ver <= $self->{slave_list_version
}) {
190 $self->{slave_list_version
} = $ver;
191 # Restart connections from scratch if the configuration changed.
192 $self->{connected_slaves
} = {};
196 # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
201 my $sk = MogileFS
::Config
->server_setting_cached('slave_keys')
205 foreach my $key (split /\s*,\s*/, $sk) {
206 my $slave = MogileFS
::Config
->server_setting_cached("slave_$key");
209 error
("key for slave DB config: slave_$key not found in configuration");
213 my ($dsn, $user, $pass) = split /\|/, $slave;
214 if (!defined($dsn) or !defined($user) or !defined($pass)) {
215 error
("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
218 push @ret, [$dsn, $user, $pass]
226 my @temp = shuffle
keys %{$self->{connected_slaves
}};
228 return $self->{connected_slaves
}->{$temp[0]};
233 my $slave_fulldsn = shift;
237 MogileFS
::Config
->server_setting_cached('slave_dead_retry_timeout') || 15;
239 my $dead_backoff = $self->{dead_backoff
}->{$slave_fulldsn->[0]} || 0;
240 my $dead_timeout = $self->{dead_slaves
}->{$slave_fulldsn->[0]};
241 return if (defined $dead_timeout
242 && $dead_timeout + ($dead_retry * $dead_backoff) > $now);
243 return if ($self->{connected_slaves
}->{$slave_fulldsn->[0]});
245 my $newslave = $self->{slave
} = $self->new_from_dsn_user_pass(@
$slave_fulldsn);
246 $newslave->set_connect_timeout(
247 MogileFS
::Config
->server_setting_cached('slave_connect_timeout') || 1);
248 $self->{slave
}->{next_check
} = 0;
249 $newslave->mark_as_slave;
250 if ($self->check_slave) {
251 $self->{connected_slaves
}->{$slave_fulldsn->[0]} = $newslave;
252 $self->{dead_backoff
}->{$slave_fulldsn->[0]} = 0;
254 # Magic numbers are saddening...
255 $dead_backoff++ unless $dead_backoff > 20;
256 $self->{dead_slaves
}->{$slave_fulldsn->[0]} = $now;
257 $self->{dead_backoff
}->{$slave_fulldsn->[0]} = $dead_backoff;
264 die "Incapable of having slaves." unless $self->can_do_slaves;
266 $self->{slave
} = undef;
267 foreach my $slave (keys %{$self->{dead_slaves
}}) {
268 my ($full_dsn) = grep { $slave eq $_->[0] } @
{$self->{slave_list_cache
}};
270 delete $self->{dead_slaves
}->{$slave};
273 $self->_connect_slave($full_dsn);
276 unless ($self->_slaves_list_changed) {
277 if ($self->{slave
} = $self->_pick_slave) {
278 $self->{slave
}->{recheck_req_gen
} = $self->{recheck_req_gen
};
279 return $self->{slave
} if $self->check_slave;
283 if ($self->{slave
}) {
284 my $dsn = $self->{slave
}->{dsn
};
285 $self->{dead_slaves
}->{$dsn} = time();
286 $self->{dead_backoff
}->{$dsn} = 0;
287 delete $self->{connected_slaves
}->{$dsn};
288 error
("Error talking to slave: $dsn");
290 my @slaves_list = $self->_slaves_list;
292 # If we have no slaves, then return silently.
293 return unless @slaves_list;
295 my $slave_skip_filtering = MogileFS
::Config
->server_setting_cached('slave_skip_filtering');
297 unless (defined $slave_skip_filtering && $slave_skip_filtering eq 'on') {
298 MogileFS
::run_global_hook
('slave_list_filter', \
@slaves_list);
301 $self->{slave_list_cache
} = \
@slaves_list;
303 foreach my $slave_fulldsn (@slaves_list) {
304 $self->_connect_slave($slave_fulldsn);
307 if ($self->{slave
} = $self->_pick_slave) {
308 return $self->{slave
};
310 warn "Slave list exhausted, failing back to master.";
317 return $self unless $self->can_do_slaves;
319 if ($self->{slave_ok
}) {
320 if (my $slave = $self->get_slave) {
332 return unless ref $coderef eq 'CODE';
334 local $self->{slave_ok
} = 1;
336 return $coderef->(@_);
341 $self->{recheck_req_gen
}++;
348 if ($self->{recheck_done_gen
} != $self->{recheck_req_gen
}) {
349 $self->{dbh
} = undef unless $self->{dbh
}->ping;
350 # Handles a memory leak under Solaris/Postgres.
351 # We may leak a little extra memory if we're holding a lock,
352 # since dropping a connection mid-lock is fatal
353 $self->{dbh
} = undef if ($self->{max_handles
} &&
354 $self->{handles_left
}-- < 0 && !$self->{lock_depth
});
355 $self->{recheck_done_gen
} = $self->{recheck_req_gen
};
357 return $self->{dbh
} if $self->{dbh
};
360 # Shortcut flag: if monitor thinks the master is down, avoid attempting to
361 # connect to it for now. If we already have a connection to the master,
362 # keep using it as above.
363 if (!$self->is_slave) {
364 my $flag = MogileFS
::Config
->server_setting_cached('_master_db_alive', 0);
365 return if (defined $flag && $flag == 0);;
368 # auto-reconnect is unsafe if we're holding a lock
369 if ($self->{lock_depth
}) {
370 die "DB connection recovery unsafe, lock held: $self->{last_lock}";
374 local $SIG{ALRM
} = sub { die "timeout\n" };
375 alarm($self->connect_timeout);
376 $self->{dbh
} = DBI
->connect($self->{dsn
}, $self->{user
}, $self->{pass
}, {
379 # FUTURE: will default to on (have to validate all callers first):
380 RaiseError
=> ($self->{raise_errors
} || 0),
381 sqlite_use_immediate_transaction
=> 1,
385 if ($@
eq "timeout\n") {
386 die "Failed to connect to database: timeout";
388 die "Failed to connect to database: " . DBI
->errstr;
390 $self->post_dbi_connect;
391 $self->{handles_left
} = $self->{max_handles
} if $self->{max_handles
};
395 sub have_dbh
{ return 1 if $_[0]->{dbh
}; }
399 return $self->dbh->ping;
403 my ($self, $optmsg) = @_;
404 my $dbh = $self->dbh;
405 return 1 unless $dbh->err;
406 my ($pkg, $fn, $line) = caller;
407 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
408 $msg .= ": $optmsg" if $optmsg;
409 # Auto rollback failures around transactions.
410 if ($dbh->{AutoCommit
} == 0) { eval { $dbh->rollback }; }
415 my ($self, $sql, @do_params) = @_;
416 my $rv = eval { $self->dbh->do($sql, @do_params) };
417 return $rv unless $@
|| $self->dbh->err;
418 warn "Error with SQL: $sql\n";
419 Carp
::confess
($@
|| $self->dbh->errstr);
423 croak
("Odd number of parameters!") if scalar(@_) % 2;
424 my ($self, $vlist, %uarg) = @_;
426 $ret{$_} = delete $uarg{$_} foreach @
$vlist;
427 croak
("Bogus options: ".join(',',keys %uarg)) if %uarg;
431 sub was_deadlock_error
{
433 my $dbh = $self->dbh;
437 sub was_duplicate_error
{
439 my $dbh = $self->dbh;
443 # run a subref (presumably a database update) in an eval, because you expect it to
444 # maybe fail on duplicate key error, and throw a dup exception for you, else return
447 my ($self, $code) = @_;
448 my $rv = eval { $code->(); };
449 throw
("dup") if $self->was_duplicate_error;
454 # insert row if doesn't already exist
455 # WARNING: This function is NOT transaction safe if the duplicate errors causes
456 # your transaction to halt!
457 # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
458 # is false! Rows before the duplicate will be inserted, but rows after the
459 # duplicate might not be, depending your database.
461 my ($self, $sql, @params) = @_;
462 my $dbh = $self->dbh;
463 if ($self->can_insertignore) {
464 return $dbh->do("INSERT IGNORE $sql", @params);
466 # TODO: Detect bad multi-row insert here.
467 my $rv = eval { $dbh->do("INSERT $sql", @params); };
468 if ($@
|| $dbh->err) {
469 return 1 if $self->was_duplicate_error;
470 # This chunk is identical to condthrow, but we include it directly
471 # here as we know there is definitely an error, and we would like
472 # the caller of this function.
473 my ($pkg, $fn, $line) = caller;
474 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
481 sub retry_on_deadlock
{
484 my $tries = shift || 3;
485 croak
("deadlock retries must be positive") if $tries < 1;
488 while ($tries-- > 0) {
489 $rv = eval { $code->(); };
490 next if ($self->was_deadlock_error);
497 # --------------------------------------------------------------------------
501 sub add_extra_tables
{
503 push @extra_tables, @_;
506 use constant TABLES
=> qw( domain class file tempfile file_to_delete
507 unreachable_fids file_on file_on_corrupt host
508 device server_settings file_to_replicate
509 file_to_delete_later fsck_log file_to_queue
510 file_to_delete2 checksum);
515 my $curver = $sto->schema_version;
517 my $latestver = SCHEMA_VERSION
;
518 if ($curver == $latestver) {
519 $sto->status("Schema already up-to-date at version $curver.");
523 if ($curver > $latestver) {
524 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
528 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
531 foreach my $t (TABLES
, @extra_tables) {
532 $sto->create_table($t);
535 $sto->upgrade_add_host_getport;
536 $sto->upgrade_add_host_altip;
537 $sto->upgrade_add_device_asof;
538 $sto->upgrade_add_device_weight;
539 $sto->upgrade_add_device_readonly;
540 $sto->upgrade_add_device_drain;
541 $sto->upgrade_add_class_replpolicy;
542 $sto->upgrade_modify_server_settings_value;
543 $sto->upgrade_add_file_to_queue_arg;
544 $sto->upgrade_modify_device_size;
545 $sto->upgrade_add_class_hashtype;
550 sub cached_schema_version
{
552 return $self->{_cached_schema_version
} ||=
553 $self->schema_version;
558 my $dbh = $self->dbh;
560 $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
564 sub filter_create_sql
{ my ($self, $sql) = @_; return $sql; }
567 my ($self, $table) = @_;
568 my $dbh = $self->dbh;
569 return 1 if $self->table_exists($table);
570 my $meth = "TABLE_$table";
571 my $sql = $self->$meth;
572 $sql = $self->filter_create_sql($sql);
573 $self->status("Running SQL: $sql;");
575 die "Failed to create table $table: " . $dbh->errstr;
576 my $imeth = "INDEXES_$table";
577 my @indexes = eval { $self->$imeth };
578 foreach $sql (@indexes) {
579 $self->status("Running SQL: $sql;");
581 die "Failed to create indexes on $table: " . $dbh->errstr;
585 # Please try to keep all tables aligned nicely
586 # with '"CREATE TABLE' on the first line
587 # and ')"' alone on the last line.
590 # classes are tied to domains. domains can have classes of items
591 # with different mindevcounts.
593 # a minimum devcount is the number of copies the system tries to
594 # maintain for files in that class
596 # unspecified classname means classid=0 (implicit class), and that
597 # implies mindevcount=2
598 "CREATE TABLE domain (
599 dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
600 namespace VARCHAR(255),
606 "CREATE TABLE class (
607 dmid SMALLINT UNSIGNED NOT NULL,
608 classid TINYINT UNSIGNED NOT NULL,
609 PRIMARY KEY (dmid,classid),
610 classname VARCHAR(50),
611 UNIQUE (dmid,classname),
612 mindevcount TINYINT UNSIGNED NOT NULL,
613 hashtype TINYINT UNSIGNED
617 # the length field is only here for easy verifications of content
618 # integrity when copying around. no sums or content types or other
619 # metadata here. application can handle that.
621 # classid is what class of file this belongs to. for instance, on fotobilder
622 # there will be a class for original pictures (the ones the user uploaded)
623 # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
624 # each domain can setup classes and assign the minimum redundancy level for
625 # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
626 # photos and and a 1 minimum for derived images (which means the sole device
627 # for a derived image can die, bringing devcount to 0 for that file, but
628 # the application can recreate it from its original)
631 fid INT UNSIGNED NOT NULL,
634 dmid SMALLINT UNSIGNED NOT NULL,
635 dkey VARCHAR(255), # domain-defined
636 UNIQUE dkey (dmid, dkey),
638 length BIGINT UNSIGNED, # big limit
640 classid TINYINT UNSIGNED NOT NULL,
641 devcount TINYINT UNSIGNED NOT NULL,
642 INDEX devcount (dmid,classid,devcount)
647 "CREATE TABLE tempfile (
648 fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
651 createtime INT UNSIGNED NOT NULL,
652 classid TINYINT UNSIGNED NOT NULL,
653 dmid SMALLINT UNSIGNED NOT NULL,
659 # files marked for death when their key is overwritten. then they get a new
660 # fid, but since the old row (with the old fid) had to be deleted immediately,
661 # we need a place to store the fid so an async job can delete the file from
663 sub TABLE_file_to_delete
{
664 "CREATE TABLE file_to_delete (
665 fid INT UNSIGNED NOT NULL,
670 # if the replicator notices that a fid has no sources, that file gets inserted
671 # into the unreachable_fids table. it is up to the application to actually
672 # handle fids stored in this table.
673 sub TABLE_unreachable_fids
{
674 "CREATE TABLE unreachable_fids (
675 fid INT UNSIGNED NOT NULL,
676 lastupdate INT UNSIGNED NOT NULL,
682 # what files are on what devices? (most likely physical devices,
683 # as logical devices of RAID arrays would be costly, and mogilefs
684 # already handles redundancy)
686 # the devid index lets us answer "What files were on this now-dead disk?"
688 "CREATE TABLE file_on (
689 fid INT UNSIGNED NOT NULL,
690 devid MEDIUMINT UNSIGNED NOT NULL,
691 PRIMARY KEY (fid, devid),
696 # if application or framework detects an error in one of the duplicate files
697 # for whatever reason, it can register its complaint and the framework
698 # will do some verifications and fix things up w/ an async job
699 # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
700 # on the other devices?
701 sub TABLE_file_on_corrupt
{
702 "CREATE TABLE file_on_corrupt (
703 fid INT UNSIGNED NOT NULL,
704 devid MEDIUMINT UNSIGNED NOT NULL,
705 PRIMARY KEY (fid, devid)
709 # hosts (which contain devices...)
712 hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
714 status ENUM('alive','dead','down'),
715 http_port MEDIUMINT UNSIGNED DEFAULT 7500,
716 http_get_port MEDIUMINT UNSIGNED,
718 hostname VARCHAR(40),
730 "CREATE TABLE device (
731 devid MEDIUMINT UNSIGNED NOT NULL,
732 hostid MEDIUMINT UNSIGNED NOT NULL,
734 status ENUM('alive','dead','down'),
735 weight MEDIUMINT DEFAULT 100,
737 mb_total INT UNSIGNED,
738 mb_used INT UNSIGNED,
739 mb_asof INT UNSIGNED,
745 sub TABLE_server_settings
{
746 "CREATE TABLE server_settings (
747 field VARCHAR(50) PRIMARY KEY,
752 sub TABLE_file_to_replicate
{
753 # nexttry is time to try to replicate it next.
754 # 0 means immediate. it's only on one host.
755 # 1 means lower priority. it's on 2+ but isn't happy where it's at.
756 # unix timestamp means at/after that time. some previous error occurred.
757 # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
758 # failcount. how many times we've failed, just for doing backoff of nexttry.
759 # flags. reserved for future use.
760 "CREATE TABLE file_to_replicate (
761 fid INT UNSIGNED NOT NULL PRIMARY KEY,
762 nexttry INT UNSIGNED NOT NULL,
764 fromdevid INT UNSIGNED,
765 failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
766 flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
770 sub TABLE_file_to_delete_later
{
771 "CREATE TABLE file_to_delete_later (
772 fid INT UNSIGNED NOT NULL PRIMARY KEY,
773 delafter INT UNSIGNED NOT NULL,
779 "CREATE TABLE fsck_log (
780 logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
782 utime INT UNSIGNED NOT NULL,
783 fid INT UNSIGNED NULL,
785 devid MEDIUMINT UNSIGNED,
790 # generic queue table, designed to be used for workers/jobs which aren't
791 # constantly in use, and are async to the user.
792 # ie; fsck, drain, rebalance.
793 sub TABLE_file_to_queue
{
794 "CREATE TABLE file_to_queue (
795 fid INT UNSIGNED NOT NULL,
797 type TINYINT UNSIGNED NOT NULL,
798 nexttry INT UNSIGNED NOT NULL,
799 failcount TINYINT UNSIGNED NOT NULL default '0',
800 flags SMALLINT UNSIGNED NOT NULL default '0',
802 PRIMARY KEY (fid, type),
803 INDEX type_nexttry (type,nexttry)
807 # new style async delete table.
808 # this is separate from file_to_queue since deletes are more actively used,
809 # and partitioning on 'type' doesn't always work so well.
810 sub TABLE_file_to_delete2
{
811 "CREATE TABLE file_to_delete2 (
812 fid INT UNSIGNED NOT NULL PRIMARY KEY,
813 nexttry INT UNSIGNED NOT NULL,
814 failcount TINYINT UNSIGNED NOT NULL default '0',
815 INDEX nexttry (nexttry)
820 "CREATE TABLE checksum (
821 fid INT UNSIGNED NOT NULL PRIMARY KEY,
822 hashtype TINYINT UNSIGNED NOT NULL,
823 checksum VARBINARY(64) NOT NULL
827 # these five only necessary for MySQL, since no other database existed
828 # before, so they can just create the tables correctly to begin with.
829 # in the future, there might be new alters that non-MySQL databases
830 # will have to implement.
831 sub upgrade_add_host_getport
{ 1 }
832 sub upgrade_add_host_altip
{ 1 }
833 sub upgrade_add_device_asof
{ 1 }
834 sub upgrade_add_device_weight
{ 1 }
835 sub upgrade_add_device_readonly
{ 1 }
836 sub upgrade_add_device_drain
{ die "Not implemented in $_[0]" }
837 sub upgrade_modify_server_settings_value
{ die "Not implemented in $_[0]" }
838 sub upgrade_add_file_to_queue_arg
{ die "Not implemented in $_[0]" }
839 sub upgrade_modify_device_size
{ die "Not implemented in $_[0]" }
841 sub upgrade_add_class_replpolicy
{
843 unless ($self->column_type("class", "replpolicy")) {
844 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
848 sub upgrade_add_class_hashtype
{
850 unless ($self->column_type("class", "hashtype")) {
851 $self->dowell("ALTER TABLE class ADD COLUMN hashtype TINYINT UNSIGNED");
855 # return true if deleted, 0 if didn't exist, exception if error
857 my ($self, $hostid) = @_;
858 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
861 # return true if deleted, 0 if didn't exist, exception if error
863 my ($self, $dmid) = @_;
865 my $dbh = $self->dbh;
868 if ($self->domain_has_files($dmid)) {
870 } elsif ($self->domain_has_classes($dmid)) {
871 $err = "has_classes";
873 $rv = $dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
875 # remove the "default" class if one was created (for mindevcount)
876 # this is currently the only way to delete the "default" class
877 $dbh->do("DELETE FROM class WHERE dmid = ? AND classid = 0", undef, $dmid);
880 $dbh->rollback if $err;
882 $self->condthrow; # will rollback on errors
887 sub domain_has_files
{
888 my ($self, $dmid) = @_;
889 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
891 return $has_a_fid ?
1 : 0;
894 sub domain_has_classes
{
895 my ($self, $dmid) = @_;
896 # queryworker does not permit removing default class, so domain_has_classes
897 # should not register the default class
898 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classid != 0 LIMIT 1',
900 return defined($has_a_class);
903 sub class_has_files
{
904 my ($self, $dmid, $clid) = @_;
905 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
906 undef, $dmid, $clid);
907 return $has_a_fid ?
1 : 0;
910 # return new classid on success (non-zero integer), die on failure
911 # throw 'dup' on duplicate name
913 my ($self, $dmid, $classname) = @_;
914 my $dbh = $self->dbh;
920 if ($classname eq 'default') {
923 # get the max class id in this domain
924 my $maxid = $dbh->selectrow_array
925 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
928 # now insert the new class
929 $rv = $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
930 undef, $dmid, $clsid, $classname, 2);
933 if ($@
|| $dbh->err) {
934 if ($self->was_duplicate_error) {
935 # ensure we're not inside a transaction
936 if ($dbh->{AutoCommit
} == 0) { eval { $dbh->rollback }; }
940 $self->condthrow; # this will rollback on errors
941 return $clsid if $rv;
945 # return 1 on success, throw "dup" on duplicate name error, die otherwise
946 sub update_class_name
{
948 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
950 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
951 undef, $arg{classname
}, $arg{dmid
}, $arg{classid
});
953 throw
("dup") if $self->was_duplicate_error;
958 # return 1 on success, die otherwise
959 sub update_class_mindevcount
{
961 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
963 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
964 undef, $arg{mindevcount
}, $arg{dmid
}, $arg{classid
});
970 # return 1 on success, die otherwise
971 sub update_class_replpolicy
{
973 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
975 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
976 undef, $arg{replpolicy
}, $arg{dmid
}, $arg{classid
});
982 # return 1 on success, die otherwise
983 sub update_class_hashtype
{
985 my %arg = $self->_valid_params([qw(dmid classid hashtype)], @_);
987 $self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?",
988 undef, $arg{hashtype
}, $arg{dmid
}, $arg{classid
});
993 sub nfiles_with_dmid_classid_devcount
{
994 my ($self, $dmid, $classid, $devcount) = @_;
995 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
996 undef, $dmid, $classid, $devcount);
999 sub set_server_setting
{
1000 my ($self, $key, $val) = @_;
1001 my $dbh = $self->dbh;
1002 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
1006 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
1008 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
1012 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
1016 # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
1017 sub incr_server_setting
{
1018 my ($self, $key, $val) = @_;
1019 $val = 1 unless defined $val;
1022 return 1 if $self->dbh->do("UPDATE server_settings ".
1023 "SET value=value+? ".
1024 "WHERE field=?", undef,
1026 $self->set_server_setting($key, $val);
1029 sub server_setting
{
1030 my ($self, $key) = @_;
1031 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
1035 sub server_settings
{
1038 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
1040 while (my ($k, $v) = $sth->fetchrow_array) {
1046 # register a tempfile and return the fidid, which should be allocated
1047 # using autoincrement/sequences if the passed in fid is undef. however,
1048 # if fid is passed in, that value should be used and returned.
1050 # return new/passed in fidid on success.
1051 # throw 'dup' if fid already in use
1052 # return 0/undef/die on failure
1054 sub register_tempfile
{
1056 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
1058 my $dbh = $self->dbh;
1059 my $fid = $arg{fid
};
1061 my $explicit_fid_used = $fid ?
1 : 0;
1063 # setup the new mapping. we store the devices that we picked for
1064 # this file in here, knowing that they might not be used. create_close
1065 # is responsible for actually mapping in file_on. NOTE: fid is being
1066 # passed in, it's either some number they gave us, or it's going to be
1067 # 0/undef which translates into NULL which means to automatically create
1068 # one. that should be fine.
1069 my $ins_tempfile = sub {
1071 # We must only pass the correct number of bind parameters
1072 # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
1073 # Postgres, where you are expected to leave it out or use DEFAULT
1074 # Leaving it out seems sanest and least likely to cause problems
1075 # with other databases.
1076 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
1077 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
1078 my @vals = ($arg{dmid
}, $arg{key
}, $arg{classid
} || 0, $arg{devids
});
1079 # Do not check for $explicit_fid_used, but rather $fid directly
1080 # as this anonymous sub is called from the loop later
1082 unshift @keys, 'fid';
1084 unshift @vals, $fid;
1086 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
1087 $dbh->do($sql, undef, @vals);
1090 return undef if $self->was_duplicate_error;
1091 die "Unexpected db error into tempfile: " . $dbh->errstr;
1094 unless (defined $fid) {
1095 # if they did not give us a fid, then we want to grab the one that was
1096 # theoretically automatically generated
1097 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
1098 or die "No last_insert_id found";
1100 return undef unless defined $fid && $fid > 0;
1104 unless ($ins_tempfile->()) {
1105 throw
("dup") if $explicit_fid_used;
1106 die "tempfile insert failed";
1109 my $fid_in_use = sub {
1110 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
1111 return $exists ?
1 : 0;
1114 # See notes in MogileFS::Config->check_database
1115 my $min_fidid = MogileFS
::Config
->config('min_fidid');
1117 # if the fid is in use, do something
1118 while ($fid_in_use->($fid) || $fid <= $min_fidid) {
1119 throw
("dup") if $explicit_fid_used;
1121 # be careful of databases which reset their
1122 # auto-increment/sequences when the table is empty (InnoDB
1123 # did/does this, for instance). So check if it's in use, and
1124 # re-seed the table with the highest known fid from the file
1127 # get the highest fid from the filetable and insert a dummy row
1128 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
1129 $ins_tempfile->(); # don't care about its result
1131 # then do a normal auto-increment
1133 $ins_tempfile->() or die "register_tempfile failed after seeding";
1139 # return hashref of row containing columns "fid, dmid, dkey, length,
1140 # classid, devcount" provided a $dmid and $key (dkey). or undef if no
1142 sub file_row_from_dmid_key
{
1143 my ($self, $dmid, $key) = @_;
1144 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1145 "FROM file WHERE dmid=? AND dkey=?",
1146 undef, $dmid, $key);
1149 # return hashref of row containing columns "fid, dmid, dkey, length,
1150 # classid, devcount" provided a $fidid or undef if no row.
1151 sub file_row_from_fidid
{
1152 my ($self, $fidid) = @_;
1153 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1154 "FROM file WHERE fid=?",
1158 # return an arrayref of rows containing columns "fid, dmid, dkey, length,
1159 # classid, devcount" provided a pair of $fidid or undef if no rows.
1160 sub file_row_from_fidid_range
{
1161 my ($self, $fromfid, $count) = @_;
1162 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1163 "FROM file WHERE fid > ? LIMIT ?");
1164 $sth->execute($fromfid,$count);
1165 return $sth->fetchall_arrayref({});
1168 # return array of devids that a fidid is on
1170 my ($self, $fidid) = @_;
1171 return @
{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
1172 undef, $fidid) || [] };
1175 # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1176 sub fid_devids_multiple
{
1177 my ($self, @fidids) = @_;
1178 my $in = join(",", map { $_+0 } @fidids);
1180 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1182 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1183 push @
{$ret->{$fidid} ||= []}, $devid;
1188 # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1189 sub tempfile_row_from_fid
{
1190 my ($self, $fidid) = @_;
1191 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1192 "FROM tempfile WHERE fid=?",
1196 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1198 my ($self, $devid, $hostid, $status) = @_;
1199 my $rv = $self->conddup(sub {
1200 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1201 $devid, $hostid, $status);
1204 die "error making device $devid\n" unless $rv > 0;
1209 my ($self, $devid, $to_update) = @_;
1210 my @keys = sort keys %$to_update;
1211 return unless @keys;
1212 $self->conddup(sub {
1213 $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1214 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
1220 sub update_device_usage
{
1222 my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_);
1224 $self->dbh->do("UPDATE device SET ".
1225 "mb_total = ?, mb_used = ?, mb_asof = ?" .
1227 undef, $arg{mb_total
}, $arg{mb_used
}, $arg{mb_asof
},
1233 # MySQL has an optimized version
1234 sub update_device_usages
{
1235 my ($self, $updates, $cb) = @_;
1236 foreach my $upd (@
$updates) {
1237 $self->update_device_usage(%$upd);
1242 # This is unimplemented at the moment as we must verify:
1243 # - no file_on rows exist
1244 # - nothing in file_to_queue is going to attempt to use it
1245 # - nothing in file_to_replicate is going to attempt to use it
1246 # - it's already been marked dead
1247 # - that all trackers are likely to know this :/
1248 # - ensure the devid can't be reused
1249 # IE; the user can't mark it dead then remove it all at once and cause their
1250 # cluster to implode.
1252 die "Unimplemented; needs further testing";
1255 sub set_device_weight
{
1256 my ($self, $devid, $weight) = @_;
1258 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1263 sub set_device_state
{
1264 my ($self, $devid, $state) = @_;
1266 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1272 my ($self, $dmid, $cid) = @_;
1273 throw
("has_files") if $self->class_has_files($dmid, $cid);
1275 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1280 # called from a queryworker process, will trigger delete_fidid_enqueued
1281 # in the delete worker
1283 my ($self, $fidid) = @_;
1284 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
1286 $self->enqueue_for_delete2($fidid, 0);
1290 # Only called from delete workers (after delete_fidid),
1291 # this reduces client-visible latency from the queryworker
1292 sub delete_fidid_enqueued
{
1293 my ($self, $fidid) = @_;
1294 eval { $self->delete_checksum($fidid); };
1296 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1300 sub delete_tempfile_row
{
1301 my ($self, $fidid) = @_;
1302 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1307 # Load the specified tempfile, then delete it. If we succeed, we were
1308 # here first; otherwise, someone else beat us here (and we return undef)
1309 sub delete_and_return_tempfile_row
{
1310 my ($self, $fidid) = @_;
1311 my $rv = $self->tempfile_row_from_fid($fidid);
1312 my $rows_deleted = $self->delete_tempfile_row($fidid);
1313 return $rv if ($rows_deleted > 0);
1316 sub replace_into_file
{
1318 my %arg = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
1319 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1321 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1322 "VALUES (?,?,?,?,?,?) ", undef,
1323 @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'});
1328 # returns 1 on success, 0 on duplicate key error, dies on exception
1329 # TODO: need a test to hit the duplicate name error condition
1330 # TODO: switch to using "dup" exception here?
1332 my ($self, $fidid, $to_key) = @_;
1333 my $dbh = $self->dbh;
1335 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1336 undef, $to_key, $fidid);
1338 if ($@
|| $dbh->err) {
1339 # first is MySQL's error code for duplicates
1340 if ($self->was_duplicate_error) {
1350 sub get_domainid_by_name
{
1352 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1357 # returns a hash of domains. Key is namespace, value is dmid.
1358 sub get_all_domains
{
1360 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1361 return map { ($_->[0], $_->[1]) } @
{$domains || []};
1364 sub get_classid_by_name
{
1366 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1367 undef, $_[0], $_[1]);
1371 # returns an array of hashrefs, one hashref per row in the 'class' table
1372 sub get_all_classes
{
1376 my @cols = qw
/dmid classid classname mindevcount/;
1377 if ($self->cached_schema_version >= 10) {
1378 push @cols, 'replpolicy';
1379 if ($self->cached_schema_version >= 15) {
1380 push @cols, 'hashtype';
1383 my $cols = join(', ', @cols);
1384 my $sth = $self->dbh->prepare("SELECT $cols FROM class");
1386 push @ret, $row while $row = $sth->fetchrow_hashref;
1390 # add a record of fidid existing on devid
1391 # returns 1 on success, 0 on duplicate
1392 sub add_fidid_to_devid
{
1393 my ($self, $fidid, $devid) = @_;
1394 croak
("fidid not non-zero") unless $fidid;
1395 croak
("devid not non-zero") unless $devid;
1397 # TODO: This should possibly be insert_ignore instead
1398 # As if we are adding an extra file_on entry, we do not want to replace the
1399 # exist one. Check REPLACE semantics.
1400 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1401 undef, $fidid, $devid);
1402 return 1 if $rv > 0;
1406 # remove a record of fidid existing on devid
1407 # returns 1 on success, 0 if not there anyway
1408 sub remove_fidid_from_devid
{
1409 my ($self, $fidid, $devid) = @_;
1410 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1411 undef, $fidid, $devid); };
1416 # Test if host exists.
1417 sub get_hostid_by_id
{
1419 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1424 sub get_hostid_by_name
{
1426 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1431 # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1434 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1435 "hostip, http_port, http_get_port, altip, altmask FROM host");
1438 while (my $row = $sth->fetchrow_hashref) {
1444 # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1445 sub get_all_devices
{
1447 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1448 "mb_used, mb_asof, status, weight FROM device");
1452 while (my $row = $sth->fetchrow_hashref) {
1458 # update the device count for a given fidid
1459 sub update_devcount
{
1460 my ($self, $fidid) = @_;
1461 my $dbh = $self->dbh;
1462 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1465 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1472 # update the classid for a given fidid
1473 sub update_classid
{
1474 my ($self, $fidid, $classid) = @_;
1475 my $dbh = $self->dbh;
1477 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1484 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1485 sub enqueue_for_replication
{
1486 my ($self, $fidid, $from_devid, $in) = @_;
1490 $nexttry = $self->unix_timestamp . " + " . int($in);
1493 $self->retry_on_deadlock(sub {
1494 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1495 "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1499 # enqueue a fidid for delete
1500 # note: if we get one more "independent" queue like this, the
1501 # code should be collapsable? I tried once and it looked too ugly, so we have
1503 sub enqueue_for_delete2
{
1504 my ($self, $fidid, $in) = @_;
1507 my $nexttry = $self->unix_timestamp . " + " . int($in);
1509 $self->retry_on_deadlock(sub {
1510 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1511 "VALUES (?,$nexttry)", undef, $fidid);
1515 # enqueue a fidid for work
1516 sub enqueue_for_todo
{
1517 my ($self, $fidid, $type, $in) = @_;
1520 my $nexttry = $self->unix_timestamp . " + " . int($in);
1522 $self->retry_on_deadlock(sub {
1524 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1525 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1526 $fidid->[0], $fidid->[1], $fidid->[2], $type);
1528 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1529 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1534 # return 1 on success. die otherwise.
1535 sub enqueue_many_for_todo
{
1536 my ($self, $fidids, $type, $in) = @_;
1537 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1538 $self->enqueue_for_todo($_, $type, $in) foreach @
$fidids;
1543 my $nexttry = $self->unix_timestamp . " + " . int($in);
1545 # TODO: convert to prepared statement?
1546 $self->retry_on_deadlock(sub {
1547 if (ref($fidids->[0]) eq 'ARRAY') {
1548 my $sql = $self->ignore_replace .
1549 "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1550 join(', ', ('(?,?,?,?,?)') x
scalar @
$fidids);
1551 $self->dbh->do($sql, undef, map { @
$_, $type, $nexttry } @
$fidids);
1553 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1555 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @
$fidids));
1561 # For file_to_queue queues that should be kept small, find the size.
1562 # This isn't fast, but for small queues won't be slow, and is usually only ran
1563 # from a single tracker.
1564 sub file_queue_length
{
1568 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1569 "WHERE type = ?", undef, $type);
1572 # reschedule all deferred replication, return number rescheduled
1576 $self->retry_on_deadlock(sub {
1577 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1578 " WHERE nexttry > " . $self->unix_timestamp);
1582 # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1583 sub get_fidids_by_device
{
1584 my ($self, $devid, $limit) = @_;
1586 my $dbh = $self->dbh;
1587 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1592 # finds a chunk of fids given a set of constraints:
1593 # devid, fidid, age (new or old), limit
1594 # Note that if this function is very slow on your large DB, you're likely
1595 # sorting by "newfiles" and are missing a new index.
1596 # returns an arrayref of fidids
1597 sub get_fidid_chunks_by_device
{
1598 my ($self, %o) = @_;
1600 my $dbh = $self->dbh;
1601 my $devid = delete $o{devid
};
1602 croak
("must supply at least a devid") unless $devid;
1603 my $age = delete $o{age
};
1604 my $fidid = delete $o{fidid
};
1605 my $limit = delete $o{limit
};
1606 croak
("invalid options: " . join(', ', keys %o)) if %o;
1607 # If supplied a "previous" fidid, we're paging through.
1611 if ($age eq 'old') {
1612 $fidsort = 'AND fid > ?' if $fidid;
1614 } elsif ($age eq 'new') {
1615 $fidsort = 'AND fid < ?' if $fidid;
1618 croak
("invalid age argument: " . $age);
1622 push @extra, $fidid if $fidid;
1624 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1625 $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1629 # gets fidids above fidid_low up to (and including) fidid_high
1630 sub get_fidids_between
{
1631 my ($self, $fidid_low, $fidid_high, $limit) = @_;
1633 $limit = int($limit);
1635 my $dbh = $self->dbh;
1636 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file
1637 WHERE fid
> ?
and fid
<= ?
1638 ORDER BY fid LIMIT
$limit}, undef, $fidid_low, $fidid_high);
1642 # creates a new domain, given a domain namespace string. return the dmid on success,
1643 # throw 'dup' on duplicate name.
1644 # override if you want a less racy version.
1646 my ($self, $name) = @_;
1647 my $dbh = $self->dbh;
1649 # get the max domain id
1650 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1652 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1653 undef, $maxid + 1, $name);
1655 if ($self->was_duplicate_error) {
1658 return $maxid+1 if $rv;
1659 die "failed to make domain"; # FIXME: the above is racy.
1663 my ($self, $hid, $to_update) = @_;
1664 my @keys = sort keys %$to_update;
1665 return unless @keys;
1666 $self->conddup(sub {
1667 $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1668 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
1674 # return ne hostid, or throw 'dup' on error.
1675 # NOTE: you need to put them into the initial 'down' state.
1677 my ($self, $hostname, $ip) = @_;
1678 my $dbh = $self->dbh;
1679 # racy! lazy. no, better: portable! how often does this happen? :)
1680 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1681 my $rv = $self->conddup(sub {
1682 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1683 "VALUES (?, ?, ?, 'down')",
1684 undef, $hid, $hostname, $ip);
1690 # return array of row hashrefs containing columns: (fid, fromdevid,
1691 # failcount, flags, nexttry)
1692 sub files_to_replicate
{
1693 my ($self, $limit) = @_;
1694 my $ut = $self->unix_timestamp;
1695 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1696 SELECT fid
, fromdevid
, failcount
, flags
, nexttry
1697 FROM file_to_replicate
1698 WHERE nexttry
<= $ut
1701 }, "fid") or return ();
1702 return values %$to_repl_map;
1705 # "new" style queue consumption code.
1706 # from within a transaction, fetch a limit of fids,
1707 # then update each fid's nexttry to be off in the future,
1708 # giving local workers some time to dequeue the items.
1710 # DBI (even with RaiseError) returns weird errors on
1711 # deadlocks from selectall_hashref. So we can't do that.
1712 # we also used to retry on deadlock within the routine,
1713 # but instead lets return undef and let job_master retry.
1714 sub grab_queue_chunk
{
1718 my $extfields = shift;
1720 my $dbh = $self->dbh;
1724 return 0 unless $self->lock_queue($queue);
1726 my $extwhere = shift || '';
1727 my $fields = 'fid, nexttry, failcount';
1728 $fields .= ', ' . $extfields if $extfields;
1731 my $ut = $self->unix_timestamp;
1735 WHERE nexttry
<= $ut
1740 $query .= "FOR UPDATE\n" if $self->can_for_update;
1741 my $sth = $dbh->prepare($query);
1743 $work = $sth->fetchall_hashref('fid');
1744 # Nothing to work on.
1745 # Now claim the fids for a while.
1746 # TODO: Should be configurable... but not necessary.
1747 my $fidlist = join(',', keys %$work);
1748 unless ($fidlist) { $dbh->commit; return; }
1749 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1752 if ($self->was_deadlock_error) {
1753 eval { $dbh->rollback };
1758 # FIXME: Super extra paranoia to prevent deadlocking.
1759 # Need to handle or die on all errors above, but $@ can get reset. For now
1760 # we'll just always ensure there's no transaction running at the end here.
1761 # A (near) release should figure the error detection correctly.
1762 if ($dbh->{AutoCommit
} == 0) { eval { $dbh->rollback }; }
1763 $self->unlock_queue($queue);
1765 return defined $work ?
values %$work : ();
1768 sub grab_files_to_replicate
{
1769 my ($self, $limit) = @_;
1770 return $self->grab_queue_chunk('file_to_replicate', $limit,
1771 'fromdevid, flags');
1774 sub grab_files_to_delete2
{
1775 my ($self, $limit) = @_;
1776 return $self->grab_queue_chunk('file_to_delete2', $limit);
1779 # $extwhere is ugly... but should be fine.
1780 sub grab_files_to_queued
{
1781 my ($self, $type, $what, $limit) = @_;
1782 $what ||= 'type, flags';
1783 return $self->grab_queue_chunk('file_to_queue', $limit,
1784 $what, 'AND type = ' . $type);
1787 # although it's safe to have multiple tracker hosts and/or processes
1788 # replicating the same file, around, it's inefficient CPU/time-wise,
1789 # and it's also possible they pick different places and waste disk.
1790 # so the replicator asks the store interface when it's about to start
1791 # and when it's done replicating a fidid, so you can do something smart
1792 # and tell it not to.
1793 sub should_begin_replicating_fidid
{
1794 my ($self, $fidid) = @_;
1795 my $lockname = "mgfs:fid:$fidid:replicate";
1796 return 1 if $self->get_lock($lockname, 1);
1800 # called when replicator is done replicating a fid, so you can cleanup
1801 # whatever you did in 'should_begin_replicating_fidid' above.
1803 # NOTE: there's a theoretical race condition in the rebalance code,
1804 # where (without locking as provided by
1805 # should_begin_replicating_fidid/note_done_replicating), all copies of
1806 # a file can be deleted by independent replicators doing rebalancing
1807 # in different ways. so you'll probably want to implement some
1808 # locking in this pair of functions.
1809 sub note_done_replicating
{
1810 my ($self, $fidid) = @_;
1811 my $lockname = "mgfs:fid:$fidid:replicate";
1812 $self->release_lock($lockname);
1815 sub find_fid_from_file_to_replicate
{
1816 my ($self, $fidid) = @_;
1817 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1821 sub find_fid_from_file_to_delete2
{
1822 my ($self, $fidid) = @_;
1823 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1827 sub find_fid_from_file_to_queue
{
1828 my ($self, $fidid, $type) = @_;
1829 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1830 undef, $fidid, $type);
1833 sub delete_fid_from_file_to_replicate
{
1834 my ($self, $fidid) = @_;
1835 $self->retry_on_deadlock(sub {
1836 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1840 sub delete_fid_from_file_to_queue
{
1841 my ($self, $fidid, $type) = @_;
1842 $self->retry_on_deadlock(sub {
1843 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1844 undef, $fidid, $type);
1848 sub delete_fid_from_file_to_delete2
{
1849 my ($self, $fidid) = @_;
1850 $self->retry_on_deadlock(sub {
1851 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1855 sub reschedule_file_to_replicate_absolute
{
1856 my ($self, $fid, $abstime) = @_;
1857 $self->retry_on_deadlock(sub {
1858 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1859 undef, $abstime, $fid);
1863 sub reschedule_file_to_replicate_relative
{
1864 my ($self, $fid, $in_n_secs) = @_;
1865 $self->retry_on_deadlock(sub {
1866 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1867 "failcount = failcount + 1 WHERE fid = ?",
1868 undef, $in_n_secs, $fid);
1872 sub reschedule_file_to_delete2_absolute
{
1873 my ($self, $fid, $abstime) = @_;
1874 $self->retry_on_deadlock(sub {
1875 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1876 undef, $abstime, $fid);
1880 sub reschedule_file_to_delete2_relative
{
1881 my ($self, $fid, $in_n_secs) = @_;
1882 $self->retry_on_deadlock(sub {
1883 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1884 "failcount = failcount + 1 WHERE fid = ?",
1885 undef, $in_n_secs, $fid);
1889 # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1892 my ($self, $dmid, $prefix, $after, $limit) = @_;
1893 # fix the input... prefix always ends with a % so that it works
1894 # in a LIKE call, and after is either blank or something
1895 $prefix = '' unless defined $prefix;
1897 # escape underscores, % and \
1898 $prefix =~ s/([%\\_])/\\$1/g;
1901 $after = '' unless defined $after;
1903 my $like = $self->get_keys_like_operator;
1905 # now select out our keys
1906 return $self->dbh->selectcol_arrayref
1907 ("SELECT dkey FROM file WHERE dmid = ? AND dkey $like ? ESCAPE ? AND dkey > ? " .
1908 "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, "\\", $after);
1911 sub get_keys_like_operator
{ return "LIKE"; }
1913 # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1914 # that were created $secs_ago seconds ago or older.
1916 my ($self, $secs_old) = @_;
1917 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1918 "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1921 # given an array of MogileFS::DevFID objects, mass-insert them all
1922 # into file_on (ignoring if they're already present)
1923 sub mass_insert_file_on
{
1924 my ($self, @devfids) = @_;
1925 return 1 unless @devfids;
1927 if (@devfids > 1 && ! $self->can_insert_multi) {
1928 $self->mass_insert_file_on($_) foreach @devfids;
1932 my (@qmarks, @binds);
1933 foreach my $df (@devfids) {
1934 my ($fidid, $devid) = ($df->fidid, $df->devid);
1935 Carp
::croak
("got a false fidid") unless $fidid;
1936 Carp
::croak
("got a false devid") unless $devid;
1937 push @binds, $fidid, $devid;
1938 push @qmarks, "(?,?)";
1941 # TODO: This should possibly be insert_ignore instead
1942 # As if we are adding an extra file_on entry, we do not want to replace the
1943 # exist one. Check REPLACE semantics.
1944 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1948 sub set_schema_vesion
{
1949 my ($self, $ver) = @_;
1950 $self->set_server_setting("schema_version", int($ver));
1953 # returns array of fidids to try and delete again
1954 sub fids_to_delete_again
{
1956 my $ut = $self->unix_timestamp;
1957 return @
{ $self->dbh->selectcol_arrayref(qq{
1959 FROM file_to_delete_later
1960 WHERE delafter
< $ut
1965 # return 1 on success. die otherwise.
1966 sub enqueue_fids_to_delete
{
1967 my ($self, @fidids) = @_;
1968 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1969 # when the first row causes the duplicate error, and the remaining rows are
1971 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1972 $self->enqueue_fids_to_delete($_) foreach @fidids;
1975 # TODO: convert to prepared statement?
1976 $self->retry_on_deadlock(sub {
1977 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1978 join(",", map { "(" . int($_) . ")" } @fidids));
1983 sub enqueue_fids_to_delete2
{
1984 my ($self, @fidids) = @_;
1985 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1986 # when the first row causes the duplicate error, and the remaining rows are
1988 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1989 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1993 my $nexttry = $self->unix_timestamp;
1995 # TODO: convert to prepared statement?
1996 $self->retry_on_deadlock(sub {
1997 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
1999 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
2004 # clears everything from the fsck_log table
2005 # return 1 on success. die otherwise.
2006 sub clear_fsck_log
{
2008 $self->dbh->do("DELETE FROM fsck_log");
2012 # FIXME: Fsck log entries are processed a little out of order.
2013 # Once a fsck has completed, the log should be re-summarized.
2014 sub fsck_log_summarize
{
2017 my $lockname = 'mgfs:fscksum';
2018 my $lock = eval { $self->get_lock($lockname, 10) };
2019 return 0 if defined $lock && $lock == 0;
2021 my $logid = $self->max_fsck_logid;
2023 # sum-up evcode counts every so often, to make fsck_status faster,
2024 # avoiding a potentially-huge GROUP BY in the future..
2025 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
2027 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
2029 my $cts = $self->fsck_evcode_counts(logid_range
=> [$min_logid, $logid]); # inclusive notation :)
2030 while (my ($evcode, $ct) = each %$cts) {
2031 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
2033 $self->set_server_setting("fsck_logid_processed", $logid);
2035 $self->release_lock($lockname) if $lock;
2039 my ($self, %opts) = @_;
2040 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
2041 "VALUES (" . $self->unix_timestamp . ",?,?,?)",
2045 delete $opts{devid
});
2046 croak
("Unknown opts") if %opts;
2052 sub get_db_unixtime
{
2054 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
2059 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
2062 sub max_fsck_logid
{
2064 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
2067 # returns array of $row hashrefs, from fsck_log table
2069 my ($self, $after_logid, $limit) = @_;
2070 $limit = int($limit || 100);
2071 $after_logid = int($after_logid || 0);
2074 my $sth = $self->dbh->prepare(qq{
2075 SELECT logid
, utime, fid
, evcode
, devid
2081 $sth->execute($after_logid);
2083 push @rows, $row while $row = $sth->fetchrow_hashref;
2087 sub fsck_evcode_counts
{
2088 my ($self, %opts) = @_;
2089 my $timegte = delete $opts{time_gte
};
2090 my $logr = delete $opts{logid_range
};
2096 $sth = $self->dbh->prepare(qq{
2097 SELECT evcode
, COUNT
(*) FROM fsck_log
2101 $sth->execute($timegte||0);
2104 $sth = $self->dbh->prepare(qq{
2105 SELECT evcode
, COUNT
(*) FROM fsck_log
2106 WHERE logid
>= ? AND logid
<= ?
2109 $sth->execute($logr->[0], $logr->[1]);
2111 while (my ($ev, $ct) = $sth->fetchrow_array) {
2117 # run before daemonizing. you can die from here if you see something's amiss. or emit
2119 sub pre_daemonize_checks
{
2122 $self->pre_daemonize_check_slaves;
2125 sub pre_daemonize_check_slaves
{
2126 my $sk = MogileFS
::Config
->server_setting('slave_keys')
2130 foreach my $key (split /\s*,\s*/, $sk) {
2131 my $slave = MogileFS
::Config
->server_setting("slave_$key");
2134 error
("key for slave DB config: slave_$key not found in configuration");
2138 my ($dsn, $user, $pass) = split /\|/, $slave;
2139 if (!defined($dsn) or !defined($user) or !defined($pass)) {
2140 error
("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
2143 push @slaves, [$dsn, $user, $pass]
2146 return unless @slaves; # Escape this block if we don't have a set of slaves anyways
2148 MogileFS
::run_global_hook
('slave_list_check', \
@slaves);
2152 # attempt to grab a lock of lockname, and timeout after timeout seconds.
2153 # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
2155 my ($self, $lockname, $timeout) = @_;
2156 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth
};
2157 die "get_lock not implemented for $self";
2160 # attempt to release a lock of lockname.
2161 # returns 1 on success and 0 if no lock we have has that name.
2163 my ($self, $lockname) = @_;
2164 die "release_lock not implemented for $self";
2167 # MySQL has an issue where you either get excessive deadlocks, or INSERT's
2168 # hang forever around some transactions. Use ghetto locking to cope.
2169 sub lock_queue
{ 1 }
2170 sub unlock_queue
{ 1 }
2172 sub BLOB_BIND_TYPE
{ undef; }
2175 my ($self, $fidid, $hashtype, $checksum) = @_;
2176 my $dbh = $self->dbh;
2177 die "Your database does not support REPLACE! Reimplement set_checksum!" unless $self->can_replace;
2180 my $sth = $dbh->prepare("REPLACE INTO checksum " .
2181 "(fid, hashtype, checksum) " .
2182 "VALUES (?, ?, ?)");
2183 $sth->bind_param(1, $fidid);
2184 $sth->bind_param(2, $hashtype);
2185 $sth->bind_param(3, $checksum, BLOB_BIND_TYPE
);
2192 my ($self, $fidid) = @_;
2194 $self->dbh->selectrow_hashref("SELECT fid, hashtype, checksum " .
2195 "FROM checksum WHERE fid = ?",
2199 sub delete_checksum
{
2200 my ($self, $fidid) = @_;
2202 $self->dbh->do("DELETE FROM checksum WHERE fid = ?", undef, $fidid);
2205 # setup the value used in a 'nexttry' field to indicate that this item will
2206 # never actually be tried again and require some sort of manual intervention.
2207 use constant ENDOFTIME
=> 2147483647;
2209 sub end_of_time
{ ENDOFTIME
; }
2211 # returns the size of the non-urgent replication queue
2212 # nexttry == 0 - the file is urgent
2213 # nexttry != 0 && nexttry < ENDOFTIME - the file is deferred
2214 sub deferred_repl_queue_length
{
2217 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file_to_replicate WHERE nexttry != 0 AND nexttry < ?', undef, $self->end_of_time);
2226 MogileFS::Store - data storage provider. base class.
2230 MogileFS aims to be database-independent (though currently as of late
2231 2006 only works with MySQL). In the future, the server will create a
2232 singleton instance of type "MogileFS::Store", like
2233 L<MogileFS::Store::MySQL>, and all database interaction will be
2238 L<MogileFS::Store::MySQL>