client: always disable watch_read after a command
[MogileFS-Server.git] / lib / MogileFS / Store.pm
blobe45eccc68350a919a81d13542369f80d83ea487b
1 package MogileFS::Store;
2 use strict;
3 use warnings;
4 use Carp qw(croak confess);
5 use MogileFS::Util qw(throw max error);
6 use DBI; # no reason a Store has to be DBI-based, but for now they all are.
7 use List::Util qw(shuffle);
9 # this is incremented whenever the schema changes. server will refuse
10 # to start-up with an old schema version
12 # 6: adds file_to_replicate table
13 # 7: adds file_to_delete_later table
14 # 8: adds fsck_log table
15 # 9: adds 'drain' state to enum in device table
16 # 10: adds 'replpolicy' column to 'class' table
17 # 11: adds 'file_to_queue' table
18 # 12: adds 'file_to_delete2' table
19 # 13: modifies 'server_settings.value' to TEXT for wider values
20 # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21 # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22 # 15: adds checksum table, adds 'hashtype' column to 'class' table
23 # 16: adds 'readonly' state to enum in host table
24 use constant SCHEMA_VERSION => 16;
26 sub new {
27 my ($class) = @_;
28 return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass max_handles));
31 sub new_from_dsn_user_pass {
32 my ($class, $dsn, $user, $pass, $max_handles) = @_;
33 my $subclass;
34 if ($dsn =~ /^DBI:mysql:/i) {
35 $subclass = "MogileFS::Store::MySQL";
36 } elsif ($dsn =~ /^DBI:SQLite:/i) {
37 $subclass = "MogileFS::Store::SQLite";
38 } elsif ($dsn =~ /^DBI:Oracle:/i) {
39 $subclass = "MogileFS::Store::Oracle";
40 } elsif ($dsn =~ /^DBI:Pg:/i) {
41 $subclass = "MogileFS::Store::Postgres";
42 } else {
43 die "Unknown database type: $dsn";
45 unless (eval "use $subclass; 1") {
46 die "Error loading $subclass: $@\n";
48 my $self = bless {
49 dsn => $dsn,
50 user => $user,
51 pass => $pass,
52 max_handles => $max_handles, # Max number of handles to allow
53 raise_errors => $subclass->want_raise_errors,
54 slave_list_version => 0,
55 slave_list_cache => [],
56 recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested
57 recheck_done_gen => 0, # once recheck is done, copy of what the request generation was
58 handles_left => 0, # amount of times this handle can still be verified
59 connected_slaves => {},
60 dead_slaves => {},
61 dead_backoff => {}, # how many times in a row a slave has died
62 connect_timeout => 10, # High default.
63 }, $subclass;
64 $self->init;
65 return $self;
68 # Defaults to true now.
69 sub want_raise_errors {
73 sub new_from_mogdbsetup {
74 my ($class, %args) = @_;
75 # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
76 my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}, $args{dbport});
78 my $try_make_sto = sub {
79 my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
80 PrintError => 0,
81 }) or return undef;
82 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
83 $sto->raise_errors;
84 return $sto;
87 # upgrading, apparently, as this database already exists.
88 my $sto = $try_make_sto->();
89 return $sto if $sto;
91 # otherwise, we need to make the requested database, setup permissions, etc
92 $class->status("couldn't connect to database as mogilefs user. trying root...");
93 my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}, $args{dbport});
94 my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
95 PrintError => 0,
96 }) or
97 die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI->errstr . "\n";
98 $class->status("connected to database as root user.");
100 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
101 $class->create_db_if_not_exists($rdbh, $args{dbname});
102 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
103 $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
105 # should be ready now:
106 $sto = $try_make_sto->();
107 return $sto if $sto;
109 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
112 # given a root DBI connection, create the named database. succeed
113 # if it it's made, or already exists. die otherwise.
114 sub create_db_if_not_exists {
115 my ($pkg, $rdbh, $dbname) = @_;
116 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
117 or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
120 sub grant_privileges {
121 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
122 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
123 undef, $pass)
124 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
125 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
126 undef, $pass)
127 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
130 sub can_replace { 0 }
131 sub can_insertignore { 0 }
132 sub can_insert_multi { 0 }
133 sub can_for_update { 1 }
135 sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
137 sub ignore_replace {
138 my $self = shift;
139 return "INSERT IGNORE " if $self->can_insertignore;
140 return "REPLACE " if $self->can_replace;
141 die "Can't INSERT IGNORE or REPLACE?";
144 my $on_status = sub {};
145 my $on_confirm = sub { 1 };
146 sub on_status { my ($pkg, $code) = @_; $on_status = $code; };
147 sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
148 sub status { my ($pkg, $msg) = @_; $on_status->($msg); };
149 sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
151 sub latest_schema_version { SCHEMA_VERSION }
153 sub raise_errors {
154 my $self = shift;
155 $self->{raise_errors} = 1;
156 $self->dbh->{RaiseError} = 1;
159 sub set_connect_timeout { $_[0]{connect_timeout} = $_[1]; }
161 sub dsn { $_[0]{dsn} }
162 sub user { $_[0]{user} }
163 sub pass { $_[0]{pass} }
165 sub connect_timeout { $_[0]{connect_timeout} }
167 sub init { 1 }
168 sub post_dbi_connect { 1 }
170 sub can_do_slaves { 0 }
172 sub mark_as_slave {
173 my $self = shift;
174 die "Incapable of becoming slave." unless $self->can_do_slaves;
176 $self->{is_slave} = 1;
179 sub is_slave {
180 my $self = shift;
181 return $self->{is_slave};
184 sub _slaves_list_changed {
185 my $self = shift;
186 my $ver = MogileFS::Config->server_setting_cached('slave_version') || 0;
187 if ($ver <= $self->{slave_list_version}) {
188 return 0;
190 $self->{slave_list_version} = $ver;
191 # Restart connections from scratch if the configuration changed.
192 $self->{connected_slaves} = {};
193 return 1;
196 # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
197 sub _slaves_list {
198 my $self = shift;
199 my $now = time();
201 my $sk = MogileFS::Config->server_setting_cached('slave_keys')
202 or return ();
204 my @ret;
205 foreach my $key (split /\s*,\s*/, $sk) {
206 my $slave = MogileFS::Config->server_setting_cached("slave_$key");
208 if (!$slave) {
209 error("key for slave DB config: slave_$key not found in configuration");
210 next;
213 my ($dsn, $user, $pass) = split /\|/, $slave;
214 if (!defined($dsn) or !defined($user) or !defined($pass)) {
215 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
216 next;
218 push @ret, [$dsn, $user, $pass]
221 return @ret;
224 sub _pick_slave {
225 my $self = shift;
226 my @temp = shuffle keys %{$self->{connected_slaves}};
227 return unless @temp;
228 return $self->{connected_slaves}->{$temp[0]};
231 sub _connect_slave {
232 my $self = shift;
233 my $slave_fulldsn = shift;
234 my $now = time();
236 my $dead_retry =
237 MogileFS::Config->server_setting_cached('slave_dead_retry_timeout') || 15;
239 my $dead_backoff = $self->{dead_backoff}->{$slave_fulldsn->[0]} || 0;
240 my $dead_timeout = $self->{dead_slaves}->{$slave_fulldsn->[0]};
241 return if (defined $dead_timeout
242 && $dead_timeout + ($dead_retry * $dead_backoff) > $now);
243 return if ($self->{connected_slaves}->{$slave_fulldsn->[0]});
245 my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
246 $newslave->set_connect_timeout(
247 MogileFS::Config->server_setting_cached('slave_connect_timeout') || 1);
248 $self->{slave}->{next_check} = 0;
249 $newslave->mark_as_slave;
250 if ($self->check_slave) {
251 $self->{connected_slaves}->{$slave_fulldsn->[0]} = $newslave;
252 $self->{dead_backoff}->{$slave_fulldsn->[0]} = 0;
253 } else {
254 # Magic numbers are saddening...
255 $dead_backoff++ unless $dead_backoff > 20;
256 $self->{dead_slaves}->{$slave_fulldsn->[0]} = $now;
257 $self->{dead_backoff}->{$slave_fulldsn->[0]} = $dead_backoff;
261 sub get_slave {
262 my $self = shift;
264 die "Incapable of having slaves." unless $self->can_do_slaves;
266 $self->{slave} = undef;
267 foreach my $slave (keys %{$self->{dead_slaves}}) {
268 my ($full_dsn) = grep { $slave eq $_->[0] } @{$self->{slave_list_cache}};
269 unless ($full_dsn) {
270 delete $self->{dead_slaves}->{$slave};
271 next;
273 $self->_connect_slave($full_dsn);
276 unless ($self->_slaves_list_changed) {
277 if ($self->{slave} = $self->_pick_slave) {
278 $self->{slave}->{recheck_req_gen} = $self->{recheck_req_gen};
279 return $self->{slave} if $self->check_slave;
283 if ($self->{slave}) {
284 my $dsn = $self->{slave}->{dsn};
285 $self->{dead_slaves}->{$dsn} = time();
286 $self->{dead_backoff}->{$dsn} = 0;
287 delete $self->{connected_slaves}->{$dsn};
288 error("Error talking to slave: $dsn");
290 my @slaves_list = $self->_slaves_list;
292 # If we have no slaves, then return silently.
293 return unless @slaves_list;
295 my $slave_skip_filtering = MogileFS::Config->server_setting_cached('slave_skip_filtering');
297 unless (defined $slave_skip_filtering && $slave_skip_filtering eq 'on') {
298 MogileFS::run_global_hook('slave_list_filter', \@slaves_list);
301 $self->{slave_list_cache} = \@slaves_list;
303 foreach my $slave_fulldsn (@slaves_list) {
304 $self->_connect_slave($slave_fulldsn);
307 if ($self->{slave} = $self->_pick_slave) {
308 return $self->{slave};
310 warn "Slave list exhausted, failing back to master.";
311 return;
314 sub read_store {
315 my $self = shift;
317 return $self unless $self->can_do_slaves;
319 if ($self->{slave_ok}) {
320 if (my $slave = $self->get_slave) {
321 return $slave;
325 return $self;
328 sub slaves_ok {
329 my $self = shift;
330 my $coderef = shift;
332 return unless ref $coderef eq 'CODE';
334 local $self->{slave_ok} = 1;
336 return $coderef->(@_);
339 sub recheck_dbh {
340 my $self = shift;
341 $self->{recheck_req_gen}++;
344 sub dbh {
345 my $self = shift;
347 if ($self->{dbh}) {
348 if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
349 $self->{dbh} = undef unless $self->{dbh}->ping;
350 # Handles a memory leak under Solaris/Postgres.
351 # We may leak a little extra memory if we're holding a lock,
352 # since dropping a connection mid-lock is fatal
353 $self->{dbh} = undef if ($self->{max_handles} &&
354 $self->{handles_left}-- < 0 && !$self->{lock_depth});
355 $self->{recheck_done_gen} = $self->{recheck_req_gen};
357 return $self->{dbh} if $self->{dbh};
360 # Shortcut flag: if monitor thinks the master is down, avoid attempting to
361 # connect to it for now. If we already have a connection to the master,
362 # keep using it as above.
363 if (!$self->is_slave) {
364 my $flag = MogileFS::Config->server_setting_cached('_master_db_alive', 0);
365 return if (defined $flag && $flag == 0);;
368 # auto-reconnect is unsafe if we're holding a lock
369 if ($self->{lock_depth}) {
370 die "DB connection recovery unsafe, lock held: $self->{last_lock}";
373 eval {
374 local $SIG{ALRM} = sub { die "timeout\n" };
375 alarm($self->connect_timeout);
376 $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
377 PrintError => 0,
378 AutoCommit => 1,
379 # FUTURE: will default to on (have to validate all callers first):
380 RaiseError => ($self->{raise_errors} || 0),
381 sqlite_use_immediate_transaction => 1,
384 alarm(0);
385 if ($@ eq "timeout\n") {
386 die "Failed to connect to database: timeout";
387 } elsif ($@) {
388 die "Failed to connect to database: " . DBI->errstr;
390 $self->post_dbi_connect;
391 $self->{handles_left} = $self->{max_handles} if $self->{max_handles};
392 return $self->{dbh};
395 sub have_dbh { return 1 if $_[0]->{dbh}; }
397 sub ping {
398 my $self = shift;
399 return $self->dbh->ping;
402 sub condthrow {
403 my ($self, $optmsg) = @_;
404 my $dbh = $self->dbh;
405 return 1 unless $dbh->err;
406 my ($pkg, $fn, $line) = caller;
407 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
408 $msg .= ": $optmsg" if $optmsg;
409 # Auto rollback failures around transactions.
410 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
411 croak($msg);
414 sub dowell {
415 my ($self, $sql, @do_params) = @_;
416 my $rv = eval { $self->dbh->do($sql, @do_params) };
417 return $rv unless $@ || $self->dbh->err;
418 warn "Error with SQL: $sql\n";
419 Carp::confess($@ || $self->dbh->errstr);
422 sub _valid_params {
423 croak("Odd number of parameters!") if scalar(@_) % 2;
424 my ($self, $vlist, %uarg) = @_;
425 my %ret;
426 $ret{$_} = delete $uarg{$_} foreach @$vlist;
427 croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
428 return %ret;
431 sub was_deadlock_error {
432 my $self = shift;
433 my $dbh = $self->dbh;
434 die "UNIMPLEMENTED";
437 sub was_duplicate_error {
438 my $self = shift;
439 my $dbh = $self->dbh;
440 die "UNIMPLEMENTED";
443 # run a subref (presumably a database update) in an eval, because you expect it to
444 # maybe fail on duplicate key error, and throw a dup exception for you, else return
445 # its return value
446 sub conddup {
447 my ($self, $code) = @_;
448 my $rv = eval { $code->(); };
449 throw("dup") if $self->was_duplicate_error;
450 croak($@) if $@;
451 return $rv;
454 # insert row if doesn't already exist
455 # WARNING: This function is NOT transaction safe if the duplicate errors causes
456 # your transaction to halt!
457 # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
458 # is false! Rows before the duplicate will be inserted, but rows after the
459 # duplicate might not be, depending your database.
460 sub insert_ignore {
461 my ($self, $sql, @params) = @_;
462 my $dbh = $self->dbh;
463 if ($self->can_insertignore) {
464 return $dbh->do("INSERT IGNORE $sql", @params);
465 } else {
466 # TODO: Detect bad multi-row insert here.
467 my $rv = eval { $dbh->do("INSERT $sql", @params); };
468 if ($@ || $dbh->err) {
469 return 1 if $self->was_duplicate_error;
470 # This chunk is identical to condthrow, but we include it directly
471 # here as we know there is definitely an error, and we would like
472 # the caller of this function.
473 my ($pkg, $fn, $line) = caller;
474 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
475 croak($msg);
477 return $rv;
481 sub retry_on_deadlock {
482 my $self = shift;
483 my $code = shift;
484 my $tries = shift || 3;
485 croak("deadlock retries must be positive") if $tries < 1;
486 my $rv;
488 while ($tries-- > 0) {
489 $rv = eval { $code->(); };
490 next if ($self->was_deadlock_error);
491 croak($@) if $@;
492 last;
494 return $rv;
497 # --------------------------------------------------------------------------
499 my @extra_tables;
501 sub add_extra_tables {
502 my $class = shift;
503 push @extra_tables, @_;
506 use constant TABLES => qw( domain class file tempfile file_to_delete
507 unreachable_fids file_on file_on_corrupt host
508 device server_settings file_to_replicate
509 file_to_delete_later fsck_log file_to_queue
510 file_to_delete2 checksum);
512 sub setup_database {
513 my $sto = shift;
515 my $curver = $sto->schema_version;
517 my $latestver = SCHEMA_VERSION;
518 if ($curver == $latestver) {
519 $sto->status("Schema already up-to-date at version $curver.");
520 return 1;
523 if ($curver > $latestver) {
524 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
527 if ($curver) {
528 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
531 foreach my $t (TABLES, @extra_tables) {
532 $sto->create_table($t);
535 $sto->upgrade_add_host_getport;
536 $sto->upgrade_add_host_altip;
537 $sto->upgrade_add_device_asof;
538 $sto->upgrade_add_device_weight;
539 $sto->upgrade_add_device_readonly;
540 $sto->upgrade_add_device_drain;
541 $sto->upgrade_add_class_replpolicy;
542 $sto->upgrade_modify_server_settings_value;
543 $sto->upgrade_add_file_to_queue_arg;
544 $sto->upgrade_modify_device_size;
545 $sto->upgrade_add_class_hashtype;
547 return 1;
550 sub cached_schema_version {
551 my $self = shift;
552 return $self->{_cached_schema_version} ||=
553 $self->schema_version;
556 sub schema_version {
557 my $self = shift;
558 my $dbh = $self->dbh;
559 return eval {
560 $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
561 } || 0;
564 sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
566 sub create_table {
567 my ($self, $table) = @_;
568 my $dbh = $self->dbh;
569 return 1 if $self->table_exists($table);
570 my $meth = "TABLE_$table";
571 my $sql = $self->$meth;
572 $sql = $self->filter_create_sql($sql);
573 $self->status("Running SQL: $sql;");
574 $dbh->do($sql) or
575 die "Failed to create table $table: " . $dbh->errstr;
576 my $imeth = "INDEXES_$table";
577 my @indexes = eval { $self->$imeth };
578 foreach $sql (@indexes) {
579 $self->status("Running SQL: $sql;");
580 $dbh->do($sql) or
581 die "Failed to create indexes on $table: " . $dbh->errstr;
585 # Please try to keep all tables aligned nicely
586 # with '"CREATE TABLE' on the first line
587 # and ')"' alone on the last line.
589 sub TABLE_domain {
590 # classes are tied to domains. domains can have classes of items
591 # with different mindevcounts.
593 # a minimum devcount is the number of copies the system tries to
594 # maintain for files in that class
596 # unspecified classname means classid=0 (implicit class), and that
597 # implies mindevcount=2
598 "CREATE TABLE domain (
599 dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
600 namespace VARCHAR(255),
601 UNIQUE (namespace)
605 sub TABLE_class {
606 "CREATE TABLE class (
607 dmid SMALLINT UNSIGNED NOT NULL,
608 classid TINYINT UNSIGNED NOT NULL,
609 PRIMARY KEY (dmid,classid),
610 classname VARCHAR(50),
611 UNIQUE (dmid,classname),
612 mindevcount TINYINT UNSIGNED NOT NULL,
613 hashtype TINYINT UNSIGNED
617 # the length field is only here for easy verifications of content
618 # integrity when copying around. no sums or content types or other
619 # metadata here. application can handle that.
621 # classid is what class of file this belongs to. for instance, on fotobilder
622 # there will be a class for original pictures (the ones the user uploaded)
623 # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
624 # each domain can setup classes and assign the minimum redundancy level for
625 # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
626 # photos and and a 1 minimum for derived images (which means the sole device
627 # for a derived image can die, bringing devcount to 0 for that file, but
628 # the application can recreate it from its original)
629 sub TABLE_file {
630 "CREATE TABLE file (
631 fid INT UNSIGNED NOT NULL,
632 PRIMARY KEY (fid),
634 dmid SMALLINT UNSIGNED NOT NULL,
635 dkey VARCHAR(255), # domain-defined
636 UNIQUE dkey (dmid, dkey),
638 length BIGINT UNSIGNED, # big limit
640 classid TINYINT UNSIGNED NOT NULL,
641 devcount TINYINT UNSIGNED NOT NULL,
642 INDEX devcount (dmid,classid,devcount)
646 sub TABLE_tempfile {
647 "CREATE TABLE tempfile (
648 fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
649 PRIMARY KEY (fid),
651 createtime INT UNSIGNED NOT NULL,
652 classid TINYINT UNSIGNED NOT NULL,
653 dmid SMALLINT UNSIGNED NOT NULL,
654 dkey VARCHAR(255),
655 devids VARCHAR(60)
659 # files marked for death when their key is overwritten. then they get a new
660 # fid, but since the old row (with the old fid) had to be deleted immediately,
661 # we need a place to store the fid so an async job can delete the file from
662 # all devices.
663 sub TABLE_file_to_delete {
664 "CREATE TABLE file_to_delete (
665 fid INT UNSIGNED NOT NULL,
666 PRIMARY KEY (fid)
670 # if the replicator notices that a fid has no sources, that file gets inserted
671 # into the unreachable_fids table. it is up to the application to actually
672 # handle fids stored in this table.
673 sub TABLE_unreachable_fids {
674 "CREATE TABLE unreachable_fids (
675 fid INT UNSIGNED NOT NULL,
676 lastupdate INT UNSIGNED NOT NULL,
677 PRIMARY KEY (fid),
678 INDEX (lastupdate)
682 # what files are on what devices? (most likely physical devices,
683 # as logical devices of RAID arrays would be costly, and mogilefs
684 # already handles redundancy)
686 # the devid index lets us answer "What files were on this now-dead disk?"
687 sub TABLE_file_on {
688 "CREATE TABLE file_on (
689 fid INT UNSIGNED NOT NULL,
690 devid MEDIUMINT UNSIGNED NOT NULL,
691 PRIMARY KEY (fid, devid),
692 INDEX (devid)
696 # if application or framework detects an error in one of the duplicate files
697 # for whatever reason, it can register its complaint and the framework
698 # will do some verifications and fix things up w/ an async job
699 # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
700 # on the other devices?
701 sub TABLE_file_on_corrupt {
702 "CREATE TABLE file_on_corrupt (
703 fid INT UNSIGNED NOT NULL,
704 devid MEDIUMINT UNSIGNED NOT NULL,
705 PRIMARY KEY (fid, devid)
709 # hosts (which contain devices...)
710 sub TABLE_host {
711 "CREATE TABLE host (
712 hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
714 status ENUM('alive','dead','down'),
715 http_port MEDIUMINT UNSIGNED DEFAULT 7500,
716 http_get_port MEDIUMINT UNSIGNED,
718 hostname VARCHAR(40),
719 hostip VARCHAR(15),
720 altip VARCHAR(15),
721 altmask VARCHAR(18),
722 UNIQUE (hostname),
723 UNIQUE (hostip),
724 UNIQUE (altip)
728 # disks...
729 sub TABLE_device {
730 "CREATE TABLE device (
731 devid MEDIUMINT UNSIGNED NOT NULL,
732 hostid MEDIUMINT UNSIGNED NOT NULL,
734 status ENUM('alive','dead','down'),
735 weight MEDIUMINT DEFAULT 100,
737 mb_total INT UNSIGNED,
738 mb_used INT UNSIGNED,
739 mb_asof INT UNSIGNED,
740 PRIMARY KEY (devid),
741 INDEX (status)
745 sub TABLE_server_settings {
746 "CREATE TABLE server_settings (
747 field VARCHAR(50) PRIMARY KEY,
748 value TEXT
752 sub TABLE_file_to_replicate {
753 # nexttry is time to try to replicate it next.
754 # 0 means immediate. it's only on one host.
755 # 1 means lower priority. it's on 2+ but isn't happy where it's at.
756 # unix timestamp means at/after that time. some previous error occurred.
757 # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
758 # failcount. how many times we've failed, just for doing backoff of nexttry.
759 # flags. reserved for future use.
760 "CREATE TABLE file_to_replicate (
761 fid INT UNSIGNED NOT NULL PRIMARY KEY,
762 nexttry INT UNSIGNED NOT NULL,
763 INDEX (nexttry),
764 fromdevid INT UNSIGNED,
765 failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
766 flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
770 sub TABLE_file_to_delete_later {
771 "CREATE TABLE file_to_delete_later (
772 fid INT UNSIGNED NOT NULL PRIMARY KEY,
773 delafter INT UNSIGNED NOT NULL,
774 INDEX (delafter)
778 sub TABLE_fsck_log {
779 "CREATE TABLE fsck_log (
780 logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
781 PRIMARY KEY (logid),
782 utime INT UNSIGNED NOT NULL,
783 fid INT UNSIGNED NULL,
784 evcode CHAR(4),
785 devid MEDIUMINT UNSIGNED,
786 INDEX(utime)
790 # generic queue table, designed to be used for workers/jobs which aren't
791 # constantly in use, and are async to the user.
792 # ie; fsck, drain, rebalance.
793 sub TABLE_file_to_queue {
794 "CREATE TABLE file_to_queue (
795 fid INT UNSIGNED NOT NULL,
796 devid INT UNSIGNED,
797 type TINYINT UNSIGNED NOT NULL,
798 nexttry INT UNSIGNED NOT NULL,
799 failcount TINYINT UNSIGNED NOT NULL default '0',
800 flags SMALLINT UNSIGNED NOT NULL default '0',
801 arg TEXT,
802 PRIMARY KEY (fid, type),
803 INDEX type_nexttry (type,nexttry)
807 # new style async delete table.
808 # this is separate from file_to_queue since deletes are more actively used,
809 # and partitioning on 'type' doesn't always work so well.
810 sub TABLE_file_to_delete2 {
811 "CREATE TABLE file_to_delete2 (
812 fid INT UNSIGNED NOT NULL PRIMARY KEY,
813 nexttry INT UNSIGNED NOT NULL,
814 failcount TINYINT UNSIGNED NOT NULL default '0',
815 INDEX nexttry (nexttry)
819 sub TABLE_checksum {
820 "CREATE TABLE checksum (
821 fid INT UNSIGNED NOT NULL PRIMARY KEY,
822 hashtype TINYINT UNSIGNED NOT NULL,
823 checksum VARBINARY(64) NOT NULL
827 # these five only necessary for MySQL, since no other database existed
828 # before, so they can just create the tables correctly to begin with.
829 # in the future, there might be new alters that non-MySQL databases
830 # will have to implement.
831 sub upgrade_add_host_getport { 1 }
832 sub upgrade_add_host_altip { 1 }
833 sub upgrade_add_device_asof { 1 }
834 sub upgrade_add_device_weight { 1 }
835 sub upgrade_add_device_readonly { 1 }
836 sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
837 sub upgrade_modify_server_settings_value { die "Not implemented in $_[0]" }
838 sub upgrade_add_file_to_queue_arg { die "Not implemented in $_[0]" }
839 sub upgrade_modify_device_size { die "Not implemented in $_[0]" }
841 sub upgrade_add_class_replpolicy {
842 my ($self) = @_;
843 unless ($self->column_type("class", "replpolicy")) {
844 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
848 sub upgrade_add_class_hashtype {
849 my ($self) = @_;
850 unless ($self->column_type("class", "hashtype")) {
851 $self->dowell("ALTER TABLE class ADD COLUMN hashtype TINYINT UNSIGNED");
855 # return true if deleted, 0 if didn't exist, exception if error
856 sub delete_host {
857 my ($self, $hostid) = @_;
858 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
861 # return true if deleted, 0 if didn't exist, exception if error
862 sub delete_domain {
863 my ($self, $dmid) = @_;
864 my ($err, $rv);
865 my $dbh = $self->dbh;
866 eval {
867 $dbh->begin_work;
868 if ($self->domain_has_files($dmid)) {
869 $err = "has_files";
870 } elsif ($self->domain_has_classes($dmid)) {
871 $err = "has_classes";
872 } else {
873 $rv = $dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
875 # remove the "default" class if one was created (for mindevcount)
876 # this is currently the only way to delete the "default" class
877 $dbh->do("DELETE FROM class WHERE dmid = ? AND classid = 0", undef, $dmid);
878 $dbh->commit;
880 $dbh->rollback if $err;
882 $self->condthrow; # will rollback on errors
883 throw($err) if $err;
884 return $rv;
887 sub domain_has_files {
888 my ($self, $dmid) = @_;
889 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
890 undef, $dmid);
891 return $has_a_fid ? 1 : 0;
894 sub domain_has_classes {
895 my ($self, $dmid) = @_;
896 # queryworker does not permit removing default class, so domain_has_classes
897 # should not register the default class
898 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classid != 0 LIMIT 1',
899 undef, $dmid);
900 return defined($has_a_class);
903 sub class_has_files {
904 my ($self, $dmid, $clid) = @_;
905 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
906 undef, $dmid, $clid);
907 return $has_a_fid ? 1 : 0;
910 # return new classid on success (non-zero integer), die on failure
911 # throw 'dup' on duplicate name
912 sub create_class {
913 my ($self, $dmid, $classname) = @_;
914 my $dbh = $self->dbh;
916 my ($clsid, $rv);
918 eval {
919 $dbh->begin_work;
920 if ($classname eq 'default') {
921 $clsid = 0;
922 } else {
923 # get the max class id in this domain
924 my $maxid = $dbh->selectrow_array
925 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
926 $clsid = $maxid + 1;
928 # now insert the new class
929 $rv = $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
930 undef, $dmid, $clsid, $classname, 2);
931 $dbh->commit if $rv;
933 if ($@ || $dbh->err) {
934 if ($self->was_duplicate_error) {
935 # ensure we're not inside a transaction
936 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
937 throw("dup");
940 $self->condthrow; # this will rollback on errors
941 return $clsid if $rv;
942 die;
945 # return 1 on success, throw "dup" on duplicate name error, die otherwise
946 sub update_class_name {
947 my $self = shift;
948 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
949 my $rv = eval {
950 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
951 undef, $arg{classname}, $arg{dmid}, $arg{classid});
953 throw("dup") if $self->was_duplicate_error;
954 $self->condthrow;
955 return 1;
958 # return 1 on success, die otherwise
959 sub update_class_mindevcount {
960 my $self = shift;
961 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
962 eval {
963 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
964 undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
966 $self->condthrow;
967 return 1;
970 # return 1 on success, die otherwise
971 sub update_class_replpolicy {
972 my $self = shift;
973 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
974 eval {
975 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
976 undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
978 $self->condthrow;
979 return 1;
982 # return 1 on success, die otherwise
983 sub update_class_hashtype {
984 my $self = shift;
985 my %arg = $self->_valid_params([qw(dmid classid hashtype)], @_);
986 eval {
987 $self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?",
988 undef, $arg{hashtype}, $arg{dmid}, $arg{classid});
990 $self->condthrow;
993 sub nfiles_with_dmid_classid_devcount {
994 my ($self, $dmid, $classid, $devcount) = @_;
995 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
996 undef, $dmid, $classid, $devcount);
999 sub set_server_setting {
1000 my ($self, $key, $val) = @_;
1001 my $dbh = $self->dbh;
1002 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
1004 eval {
1005 if (defined $val) {
1006 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
1007 } else {
1008 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
1012 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
1013 return 1;
1016 # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
1017 sub incr_server_setting {
1018 my ($self, $key, $val) = @_;
1019 $val = 1 unless defined $val;
1020 return unless $val;
1022 return 1 if $self->dbh->do("UPDATE server_settings ".
1023 "SET value=value+? ".
1024 "WHERE field=?", undef,
1025 $val, $key) > 0;
1026 $self->set_server_setting($key, $val);
1029 sub server_setting {
1030 my ($self, $key) = @_;
1031 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
1032 undef, $key);
1035 sub server_settings {
1036 my ($self) = @_;
1037 my $ret = {};
1038 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
1039 $sth->execute;
1040 while (my ($k, $v) = $sth->fetchrow_array) {
1041 $ret->{$k} = $v;
1043 return $ret;
1046 # register a tempfile and return the fidid, which should be allocated
1047 # using autoincrement/sequences if the passed in fid is undef. however,
1048 # if fid is passed in, that value should be used and returned.
1050 # return new/passed in fidid on success.
1051 # throw 'dup' if fid already in use
1052 # return 0/undef/die on failure
1054 sub register_tempfile {
1055 my $self = shift;
1056 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
1058 my $dbh = $self->dbh;
1059 my $fid = $arg{fid};
1061 my $explicit_fid_used = $fid ? 1 : 0;
1063 # setup the new mapping. we store the devices that we picked for
1064 # this file in here, knowing that they might not be used. create_close
1065 # is responsible for actually mapping in file_on. NOTE: fid is being
1066 # passed in, it's either some number they gave us, or it's going to be
1067 # 0/undef which translates into NULL which means to automatically create
1068 # one. that should be fine.
1069 my $ins_tempfile = sub {
1070 my $rv = eval {
1071 # We must only pass the correct number of bind parameters
1072 # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
1073 # Postgres, where you are expected to leave it out or use DEFAULT
1074 # Leaving it out seems sanest and least likely to cause problems
1075 # with other databases.
1076 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
1077 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
1078 my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
1079 # Do not check for $explicit_fid_used, but rather $fid directly
1080 # as this anonymous sub is called from the loop later
1081 if($fid) {
1082 unshift @keys, 'fid';
1083 unshift @vars, '?';
1084 unshift @vals, $fid;
1086 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
1087 $dbh->do($sql, undef, @vals);
1089 if (!$rv) {
1090 return undef if $self->was_duplicate_error;
1091 die "Unexpected db error into tempfile: " . $dbh->errstr;
1094 unless (defined $fid) {
1095 # if they did not give us a fid, then we want to grab the one that was
1096 # theoretically automatically generated
1097 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
1098 or die "No last_insert_id found";
1100 return undef unless defined $fid && $fid > 0;
1101 return 1;
1104 unless ($ins_tempfile->()) {
1105 throw("dup") if $explicit_fid_used;
1106 die "tempfile insert failed";
1109 my $fid_in_use = sub {
1110 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
1111 return $exists ? 1 : 0;
1114 # See notes in MogileFS::Config->check_database
1115 my $min_fidid = MogileFS::Config->config('min_fidid');
1117 # if the fid is in use, do something
1118 while ($fid_in_use->($fid) || $fid <= $min_fidid) {
1119 throw("dup") if $explicit_fid_used;
1121 # be careful of databases which reset their
1122 # auto-increment/sequences when the table is empty (InnoDB
1123 # did/does this, for instance). So check if it's in use, and
1124 # re-seed the table with the highest known fid from the file
1125 # table.
1127 # get the highest fid from the filetable and insert a dummy row
1128 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
1129 $ins_tempfile->(); # don't care about its result
1131 # then do a normal auto-increment
1132 $fid = undef;
1133 $ins_tempfile->() or die "register_tempfile failed after seeding";
1136 return $fid;
1139 # return hashref of row containing columns "fid, dmid, dkey, length,
1140 # classid, devcount" provided a $dmid and $key (dkey). or undef if no
1141 # row.
1142 sub file_row_from_dmid_key {
1143 my ($self, $dmid, $key) = @_;
1144 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1145 "FROM file WHERE dmid=? AND dkey=?",
1146 undef, $dmid, $key);
1149 # return hashref of row containing columns "fid, dmid, dkey, length,
1150 # classid, devcount" provided a $fidid or undef if no row.
1151 sub file_row_from_fidid {
1152 my ($self, $fidid) = @_;
1153 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1154 "FROM file WHERE fid=?",
1155 undef, $fidid);
1158 # return an arrayref of rows containing columns "fid, dmid, dkey, length,
1159 # classid, devcount" provided a pair of $fidid or undef if no rows.
1160 sub file_row_from_fidid_range {
1161 my ($self, $fromfid, $count) = @_;
1162 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1163 "FROM file WHERE fid > ? LIMIT ?");
1164 $sth->execute($fromfid,$count);
1165 return $sth->fetchall_arrayref({});
1168 # return array of devids that a fidid is on
1169 sub fid_devids {
1170 my ($self, $fidid) = @_;
1171 return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
1172 undef, $fidid) || [] };
1175 # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1176 sub fid_devids_multiple {
1177 my ($self, @fidids) = @_;
1178 my $in = join(",", map { $_+0 } @fidids);
1179 my $ret = {};
1180 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1181 $sth->execute;
1182 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1183 push @{$ret->{$fidid} ||= []}, $devid;
1185 return $ret;
1188 # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1189 sub tempfile_row_from_fid {
1190 my ($self, $fidid) = @_;
1191 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1192 "FROM tempfile WHERE fid=?",
1193 undef, $fidid);
1196 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1197 sub create_device {
1198 my ($self, $devid, $hostid, $status) = @_;
1199 my $rv = $self->conddup(sub {
1200 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1201 $devid, $hostid, $status);
1203 $self->condthrow;
1204 die "error making device $devid\n" unless $rv > 0;
1205 return 1;
1208 sub update_device {
1209 my ($self, $devid, $to_update) = @_;
1210 my @keys = sort keys %$to_update;
1211 return unless @keys;
1212 $self->conddup(sub {
1213 $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1214 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
1215 $devid);
1217 return 1;
1220 sub update_device_usage {
1221 my $self = shift;
1222 my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_);
1223 eval {
1224 $self->dbh->do("UPDATE device SET ".
1225 "mb_total = ?, mb_used = ?, mb_asof = ?" .
1226 " WHERE devid = ?",
1227 undef, $arg{mb_total}, $arg{mb_used}, $arg{mb_asof},
1228 $arg{devid});
1230 $self->condthrow;
1233 # MySQL has an optimized version
1234 sub update_device_usages {
1235 my ($self, $updates, $cb) = @_;
1236 foreach my $upd (@$updates) {
1237 $self->update_device_usage(%$upd);
1238 $cb->();
1242 # This is unimplemented at the moment as we must verify:
1243 # - no file_on rows exist
1244 # - nothing in file_to_queue is going to attempt to use it
1245 # - nothing in file_to_replicate is going to attempt to use it
1246 # - it's already been marked dead
1247 # - that all trackers are likely to know this :/
1248 # - ensure the devid can't be reused
1249 # IE; the user can't mark it dead then remove it all at once and cause their
1250 # cluster to implode.
1251 sub delete_device {
1252 die "Unimplemented; needs further testing";
1255 sub set_device_weight {
1256 my ($self, $devid, $weight) = @_;
1257 eval {
1258 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1260 $self->condthrow;
1263 sub set_device_state {
1264 my ($self, $devid, $state) = @_;
1265 eval {
1266 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1268 $self->condthrow;
1271 sub delete_class {
1272 my ($self, $dmid, $cid) = @_;
1273 throw("has_files") if $self->class_has_files($dmid, $cid);
1274 eval {
1275 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1277 $self->condthrow;
1280 # called from a queryworker process, will trigger delete_fidid_enqueued
1281 # in the delete worker
1282 sub delete_fidid {
1283 my ($self, $fidid) = @_;
1284 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
1285 $self->condthrow;
1286 $self->enqueue_for_delete2($fidid, 0);
1287 $self->condthrow;
1290 # Only called from delete workers (after delete_fidid),
1291 # this reduces client-visible latency from the queryworker
1292 sub delete_fidid_enqueued {
1293 my ($self, $fidid) = @_;
1294 eval { $self->delete_checksum($fidid); };
1295 $self->condthrow;
1296 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1297 $self->condthrow;
1300 sub delete_tempfile_row {
1301 my ($self, $fidid) = @_;
1302 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1303 $self->condthrow;
1304 return $rv;
1307 # Load the specified tempfile, then delete it. If we succeed, we were
1308 # here first; otherwise, someone else beat us here (and we return undef)
1309 sub delete_and_return_tempfile_row {
1310 my ($self, $fidid) = @_;
1311 my $rv = $self->tempfile_row_from_fid($fidid);
1312 my $rows_deleted = $self->delete_tempfile_row($fidid);
1313 return $rv if ($rows_deleted > 0);
1316 sub replace_into_file {
1317 my $self = shift;
1318 my %arg = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
1319 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1320 eval {
1321 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1322 "VALUES (?,?,?,?,?,?) ", undef,
1323 @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'});
1325 $self->condthrow;
1328 # returns 1 on success, 0 on duplicate key error, dies on exception
1329 # TODO: need a test to hit the duplicate name error condition
1330 # TODO: switch to using "dup" exception here?
1331 sub rename_file {
1332 my ($self, $fidid, $to_key) = @_;
1333 my $dbh = $self->dbh;
1334 eval {
1335 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1336 undef, $to_key, $fidid);
1338 if ($@ || $dbh->err) {
1339 # first is MySQL's error code for duplicates
1340 if ($self->was_duplicate_error) {
1341 return 0;
1342 } else {
1343 die $@;
1346 $self->condthrow;
1347 return 1;
1350 sub get_domainid_by_name {
1351 my $self = shift;
1352 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1353 undef, $_[0]);
1354 return $dmid;
1357 # returns a hash of domains. Key is namespace, value is dmid.
1358 sub get_all_domains {
1359 my ($self) = @_;
1360 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1361 return map { ($_->[0], $_->[1]) } @{$domains || []};
1364 sub get_classid_by_name {
1365 my $self = shift;
1366 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1367 undef, $_[0], $_[1]);
1368 return $classid;
1371 # returns an array of hashrefs, one hashref per row in the 'class' table
1372 sub get_all_classes {
1373 my ($self) = @_;
1374 my (@ret, $row);
1376 my @cols = qw/dmid classid classname mindevcount/;
1377 if ($self->cached_schema_version >= 10) {
1378 push @cols, 'replpolicy';
1379 if ($self->cached_schema_version >= 15) {
1380 push @cols, 'hashtype';
1383 my $cols = join(', ', @cols);
1384 my $sth = $self->dbh->prepare("SELECT $cols FROM class");
1385 $sth->execute;
1386 push @ret, $row while $row = $sth->fetchrow_hashref;
1387 return @ret;
1390 # add a record of fidid existing on devid
1391 # returns 1 on success, 0 on duplicate
1392 sub add_fidid_to_devid {
1393 my ($self, $fidid, $devid) = @_;
1394 croak("fidid not non-zero") unless $fidid;
1395 croak("devid not non-zero") unless $devid;
1397 # TODO: This should possibly be insert_ignore instead
1398 # As if we are adding an extra file_on entry, we do not want to replace the
1399 # exist one. Check REPLACE semantics.
1400 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1401 undef, $fidid, $devid);
1402 return 1 if $rv > 0;
1403 return 0;
1406 # remove a record of fidid existing on devid
1407 # returns 1 on success, 0 if not there anyway
1408 sub remove_fidid_from_devid {
1409 my ($self, $fidid, $devid) = @_;
1410 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1411 undef, $fidid, $devid); };
1412 $self->condthrow;
1413 return $rv;
1416 # Test if host exists.
1417 sub get_hostid_by_id {
1418 my $self = shift;
1419 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1420 undef, $_[0]);
1421 return $hostid;
1424 sub get_hostid_by_name {
1425 my $self = shift;
1426 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1427 undef, $_[0]);
1428 return $hostid;
1431 # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1432 sub get_all_hosts {
1433 my ($self) = @_;
1434 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1435 "hostip, http_port, http_get_port, altip, altmask FROM host");
1436 $sth->execute;
1437 my @ret;
1438 while (my $row = $sth->fetchrow_hashref) {
1439 push @ret, $row;
1441 return @ret;
1444 # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1445 sub get_all_devices {
1446 my ($self) = @_;
1447 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1448 "mb_used, mb_asof, status, weight FROM device");
1449 $self->condthrow;
1450 $sth->execute;
1451 my @return;
1452 while (my $row = $sth->fetchrow_hashref) {
1453 push @return, $row;
1455 return @return;
1458 # update the device count for a given fidid
1459 sub update_devcount {
1460 my ($self, $fidid) = @_;
1461 my $dbh = $self->dbh;
1462 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1463 undef, $fidid);
1465 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1466 $ct, $fidid); };
1467 $self->condthrow;
1469 return 1;
1472 # update the classid for a given fidid
1473 sub update_classid {
1474 my ($self, $fidid, $classid) = @_;
1475 my $dbh = $self->dbh;
1477 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1478 $classid, $fidid);
1480 $self->condthrow;
1481 return 1;
1484 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1485 sub enqueue_for_replication {
1486 my ($self, $fidid, $from_devid, $in) = @_;
1488 my $nexttry = 0;
1489 if ($in) {
1490 $nexttry = $self->unix_timestamp . " + " . int($in);
1493 $self->retry_on_deadlock(sub {
1494 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1495 "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1499 # enqueue a fidid for delete
1500 # note: if we get one more "independent" queue like this, the
1501 # code should be collapsable? I tried once and it looked too ugly, so we have
1502 # some redundancy.
1503 sub enqueue_for_delete2 {
1504 my ($self, $fidid, $in) = @_;
1506 $in = 0 unless $in;
1507 my $nexttry = $self->unix_timestamp . " + " . int($in);
1509 $self->retry_on_deadlock(sub {
1510 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1511 "VALUES (?,$nexttry)", undef, $fidid);
1515 # enqueue a fidid for work
1516 sub enqueue_for_todo {
1517 my ($self, $fidid, $type, $in) = @_;
1519 $in = 0 unless $in;
1520 my $nexttry = $self->unix_timestamp . " + " . int($in);
1522 $self->retry_on_deadlock(sub {
1523 if (ref($fidid)) {
1524 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1525 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1526 $fidid->[0], $fidid->[1], $fidid->[2], $type);
1527 } else {
1528 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1529 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1534 # return 1 on success. die otherwise.
1535 sub enqueue_many_for_todo {
1536 my ($self, $fidids, $type, $in) = @_;
1537 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1538 $self->enqueue_for_todo($_, $type, $in) foreach @$fidids;
1539 return 1;
1542 $in = 0 unless $in;
1543 my $nexttry = $self->unix_timestamp . " + " . int($in);
1545 # TODO: convert to prepared statement?
1546 $self->retry_on_deadlock(sub {
1547 if (ref($fidids->[0]) eq 'ARRAY') {
1548 my $sql = $self->ignore_replace .
1549 "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1550 join(', ', ('(?,?,?,?,?)') x scalar @$fidids);
1551 $self->dbh->do($sql, undef, map { @$_, $type, $nexttry } @$fidids);
1552 } else {
1553 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1554 nexttry) VALUES " .
1555 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @$fidids));
1558 $self->condthrow;
1561 # For file_to_queue queues that should be kept small, find the size.
1562 # This isn't fast, but for small queues won't be slow, and is usually only ran
1563 # from a single tracker.
1564 sub file_queue_length {
1565 my $self = shift;
1566 my $type = shift;
1568 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1569 "WHERE type = ?", undef, $type);
1572 # reschedule all deferred replication, return number rescheduled
1573 sub replicate_now {
1574 my ($self) = @_;
1576 $self->retry_on_deadlock(sub {
1577 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1578 " WHERE nexttry > " . $self->unix_timestamp);
1582 # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1583 sub get_fidids_by_device {
1584 my ($self, $devid, $limit) = @_;
1586 my $dbh = $self->dbh;
1587 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1588 undef, $devid);
1589 return $fidids;
1592 # finds a chunk of fids given a set of constraints:
1593 # devid, fidid, age (new or old), limit
1594 # Note that if this function is very slow on your large DB, you're likely
1595 # sorting by "newfiles" and are missing a new index.
1596 # returns an arrayref of fidids
1597 sub get_fidid_chunks_by_device {
1598 my ($self, %o) = @_;
1600 my $dbh = $self->dbh;
1601 my $devid = delete $o{devid};
1602 croak("must supply at least a devid") unless $devid;
1603 my $age = delete $o{age};
1604 my $fidid = delete $o{fidid};
1605 my $limit = delete $o{limit};
1606 croak("invalid options: " . join(', ', keys %o)) if %o;
1607 # If supplied a "previous" fidid, we're paging through.
1608 my $fidsort = '';
1609 my $order = '';
1610 $age ||= 'old';
1611 if ($age eq 'old') {
1612 $fidsort = 'AND fid > ?' if $fidid;
1613 $order = 'ASC';
1614 } elsif ($age eq 'new') {
1615 $fidsort = 'AND fid < ?' if $fidid;
1616 $order = 'DESC';
1617 } else {
1618 croak("invalid age argument: " . $age);
1620 $limit ||= 100;
1621 my @extra = ();
1622 push @extra, $fidid if $fidid;
1624 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1625 $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1626 return $fidids;
1629 # gets fidids above fidid_low up to (and including) fidid_high
1630 sub get_fidids_between {
1631 my ($self, $fidid_low, $fidid_high, $limit) = @_;
1632 $limit ||= 1000;
1633 $limit = int($limit);
1635 my $dbh = $self->dbh;
1636 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file
1637 WHERE fid > ? and fid <= ?
1638 ORDER BY fid LIMIT $limit}, undef, $fidid_low, $fidid_high);
1639 return $fidids;
1642 # creates a new domain, given a domain namespace string. return the dmid on success,
1643 # throw 'dup' on duplicate name.
1644 # override if you want a less racy version.
1645 sub create_domain {
1646 my ($self, $name) = @_;
1647 my $dbh = $self->dbh;
1649 # get the max domain id
1650 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1651 my $rv = eval {
1652 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1653 undef, $maxid + 1, $name);
1655 if ($self->was_duplicate_error) {
1656 throw("dup");
1658 return $maxid+1 if $rv;
1659 die "failed to make domain"; # FIXME: the above is racy.
1662 sub update_host {
1663 my ($self, $hid, $to_update) = @_;
1664 my @keys = sort keys %$to_update;
1665 return unless @keys;
1666 $self->conddup(sub {
1667 $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1668 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
1669 $hid);
1671 return 1;
1674 # return ne hostid, or throw 'dup' on error.
1675 # NOTE: you need to put them into the initial 'down' state.
1676 sub create_host {
1677 my ($self, $hostname, $ip) = @_;
1678 my $dbh = $self->dbh;
1679 # racy! lazy. no, better: portable! how often does this happen? :)
1680 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1681 my $rv = $self->conddup(sub {
1682 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1683 "VALUES (?, ?, ?, 'down')",
1684 undef, $hid, $hostname, $ip);
1686 return $hid if $rv;
1687 die "db failure";
1690 # return array of row hashrefs containing columns: (fid, fromdevid,
1691 # failcount, flags, nexttry)
1692 sub files_to_replicate {
1693 my ($self, $limit) = @_;
1694 my $ut = $self->unix_timestamp;
1695 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1696 SELECT fid, fromdevid, failcount, flags, nexttry
1697 FROM file_to_replicate
1698 WHERE nexttry <= $ut
1699 ORDER BY nexttry
1700 LIMIT $limit
1701 }, "fid") or return ();
1702 return values %$to_repl_map;
1705 # "new" style queue consumption code.
1706 # from within a transaction, fetch a limit of fids,
1707 # then update each fid's nexttry to be off in the future,
1708 # giving local workers some time to dequeue the items.
1709 # Note:
1710 # DBI (even with RaiseError) returns weird errors on
1711 # deadlocks from selectall_hashref. So we can't do that.
1712 # we also used to retry on deadlock within the routine,
1713 # but instead lets return undef and let job_master retry.
1714 sub grab_queue_chunk {
1715 my $self = shift;
1716 my $queue = shift;
1717 my $limit = shift;
1718 my $extfields = shift;
1720 my $dbh = $self->dbh;
1721 my $tries = 3;
1722 my $work;
1724 return 0 unless $self->lock_queue($queue);
1726 my $extwhere = shift || '';
1727 my $fields = 'fid, nexttry, failcount';
1728 $fields .= ', ' . $extfields if $extfields;
1729 eval {
1730 $dbh->begin_work;
1731 my $ut = $self->unix_timestamp;
1732 my $query = qq{
1733 SELECT $fields
1734 FROM $queue
1735 WHERE nexttry <= $ut
1736 $extwhere
1737 ORDER BY nexttry
1738 LIMIT $limit
1740 $query .= "FOR UPDATE\n" if $self->can_for_update;
1741 my $sth = $dbh->prepare($query);
1742 $sth->execute;
1743 $work = $sth->fetchall_hashref('fid');
1744 # Nothing to work on.
1745 # Now claim the fids for a while.
1746 # TODO: Should be configurable... but not necessary.
1747 my $fidlist = join(',', keys %$work);
1748 unless ($fidlist) { $dbh->commit; return; }
1749 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1750 $dbh->commit;
1752 if ($self->was_deadlock_error) {
1753 eval { $dbh->rollback };
1754 $work = undef;
1755 } else {
1756 $self->condthrow;
1758 # FIXME: Super extra paranoia to prevent deadlocking.
1759 # Need to handle or die on all errors above, but $@ can get reset. For now
1760 # we'll just always ensure there's no transaction running at the end here.
1761 # A (near) release should figure the error detection correctly.
1762 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
1763 $self->unlock_queue($queue);
1765 return defined $work ? values %$work : ();
1768 sub grab_files_to_replicate {
1769 my ($self, $limit) = @_;
1770 return $self->grab_queue_chunk('file_to_replicate', $limit,
1771 'fromdevid, flags');
1774 sub grab_files_to_delete2 {
1775 my ($self, $limit) = @_;
1776 return $self->grab_queue_chunk('file_to_delete2', $limit);
1779 # $extwhere is ugly... but should be fine.
1780 sub grab_files_to_queued {
1781 my ($self, $type, $what, $limit) = @_;
1782 $what ||= 'type, flags';
1783 return $self->grab_queue_chunk('file_to_queue', $limit,
1784 $what, 'AND type = ' . $type);
1787 # although it's safe to have multiple tracker hosts and/or processes
1788 # replicating the same file, around, it's inefficient CPU/time-wise,
1789 # and it's also possible they pick different places and waste disk.
1790 # so the replicator asks the store interface when it's about to start
1791 # and when it's done replicating a fidid, so you can do something smart
1792 # and tell it not to.
1793 sub should_begin_replicating_fidid {
1794 my ($self, $fidid) = @_;
1795 my $lockname = "mgfs:fid:$fidid:replicate";
1796 return 1 if $self->get_lock($lockname, 1);
1797 return 0;
1800 # called when replicator is done replicating a fid, so you can cleanup
1801 # whatever you did in 'should_begin_replicating_fidid' above.
1803 # NOTE: there's a theoretical race condition in the rebalance code,
1804 # where (without locking as provided by
1805 # should_begin_replicating_fidid/note_done_replicating), all copies of
1806 # a file can be deleted by independent replicators doing rebalancing
1807 # in different ways. so you'll probably want to implement some
1808 # locking in this pair of functions.
1809 sub note_done_replicating {
1810 my ($self, $fidid) = @_;
1811 my $lockname = "mgfs:fid:$fidid:replicate";
1812 $self->release_lock($lockname);
1815 sub find_fid_from_file_to_replicate {
1816 my ($self, $fidid) = @_;
1817 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1818 undef, $fidid);
1821 sub find_fid_from_file_to_delete2 {
1822 my ($self, $fidid) = @_;
1823 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1824 undef, $fidid);
1827 sub find_fid_from_file_to_queue {
1828 my ($self, $fidid, $type) = @_;
1829 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1830 undef, $fidid, $type);
1833 sub delete_fid_from_file_to_replicate {
1834 my ($self, $fidid) = @_;
1835 $self->retry_on_deadlock(sub {
1836 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1840 sub delete_fid_from_file_to_queue {
1841 my ($self, $fidid, $type) = @_;
1842 $self->retry_on_deadlock(sub {
1843 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1844 undef, $fidid, $type);
1848 sub delete_fid_from_file_to_delete2 {
1849 my ($self, $fidid) = @_;
1850 $self->retry_on_deadlock(sub {
1851 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1855 sub reschedule_file_to_replicate_absolute {
1856 my ($self, $fid, $abstime) = @_;
1857 $self->retry_on_deadlock(sub {
1858 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1859 undef, $abstime, $fid);
1863 sub reschedule_file_to_replicate_relative {
1864 my ($self, $fid, $in_n_secs) = @_;
1865 $self->retry_on_deadlock(sub {
1866 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1867 "failcount = failcount + 1 WHERE fid = ?",
1868 undef, $in_n_secs, $fid);
1872 sub reschedule_file_to_delete2_absolute {
1873 my ($self, $fid, $abstime) = @_;
1874 $self->retry_on_deadlock(sub {
1875 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1876 undef, $abstime, $fid);
1880 sub reschedule_file_to_delete2_relative {
1881 my ($self, $fid, $in_n_secs) = @_;
1882 $self->retry_on_deadlock(sub {
1883 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1884 "failcount = failcount + 1 WHERE fid = ?",
1885 undef, $in_n_secs, $fid);
1889 # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1890 # table.
1891 sub get_keys_like {
1892 my ($self, $dmid, $prefix, $after, $limit) = @_;
1893 # fix the input... prefix always ends with a % so that it works
1894 # in a LIKE call, and after is either blank or something
1895 $prefix = '' unless defined $prefix;
1897 # escape underscores, % and \
1898 $prefix =~ s/([%\\_])/\\$1/g;
1900 $prefix .= '%';
1901 $after = '' unless defined $after;
1903 my $like = $self->get_keys_like_operator;
1905 # now select out our keys
1906 return $self->dbh->selectcol_arrayref
1907 ("SELECT dkey FROM file WHERE dmid = ? AND dkey $like ? ESCAPE ? AND dkey > ? " .
1908 "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, "\\", $after);
1911 sub get_keys_like_operator { return "LIKE"; }
1913 # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1914 # that were created $secs_ago seconds ago or older.
1915 sub old_tempfiles {
1916 my ($self, $secs_old) = @_;
1917 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1918 "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1921 # given an array of MogileFS::DevFID objects, mass-insert them all
1922 # into file_on (ignoring if they're already present)
1923 sub mass_insert_file_on {
1924 my ($self, @devfids) = @_;
1925 return 1 unless @devfids;
1927 if (@devfids > 1 && ! $self->can_insert_multi) {
1928 $self->mass_insert_file_on($_) foreach @devfids;
1929 return 1;
1932 my (@qmarks, @binds);
1933 foreach my $df (@devfids) {
1934 my ($fidid, $devid) = ($df->fidid, $df->devid);
1935 Carp::croak("got a false fidid") unless $fidid;
1936 Carp::croak("got a false devid") unless $devid;
1937 push @binds, $fidid, $devid;
1938 push @qmarks, "(?,?)";
1941 # TODO: This should possibly be insert_ignore instead
1942 # As if we are adding an extra file_on entry, we do not want to replace the
1943 # exist one. Check REPLACE semantics.
1944 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1945 return 1;
1948 sub set_schema_vesion {
1949 my ($self, $ver) = @_;
1950 $self->set_server_setting("schema_version", int($ver));
1953 # returns array of fidids to try and delete again
1954 sub fids_to_delete_again {
1955 my $self = shift;
1956 my $ut = $self->unix_timestamp;
1957 return @{ $self->dbh->selectcol_arrayref(qq{
1958 SELECT fid
1959 FROM file_to_delete_later
1960 WHERE delafter < $ut
1961 LIMIT 500
1962 }) || [] };
1965 # return 1 on success. die otherwise.
1966 sub enqueue_fids_to_delete {
1967 my ($self, @fidids) = @_;
1968 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1969 # when the first row causes the duplicate error, and the remaining rows are
1970 # not processed.
1971 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1972 $self->enqueue_fids_to_delete($_) foreach @fidids;
1973 return 1;
1975 # TODO: convert to prepared statement?
1976 $self->retry_on_deadlock(sub {
1977 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1978 join(",", map { "(" . int($_) . ")" } @fidids));
1980 $self->condthrow;
1983 sub enqueue_fids_to_delete2 {
1984 my ($self, @fidids) = @_;
1985 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1986 # when the first row causes the duplicate error, and the remaining rows are
1987 # not processed.
1988 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1989 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1990 return 1;
1993 my $nexttry = $self->unix_timestamp;
1995 # TODO: convert to prepared statement?
1996 $self->retry_on_deadlock(sub {
1997 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
1998 nexttry) VALUES " .
1999 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
2001 $self->condthrow;
2004 # clears everything from the fsck_log table
2005 # return 1 on success. die otherwise.
2006 sub clear_fsck_log {
2007 my $self = shift;
2008 $self->dbh->do("DELETE FROM fsck_log");
2009 return 1;
2012 # FIXME: Fsck log entries are processed a little out of order.
2013 # Once a fsck has completed, the log should be re-summarized.
2014 sub fsck_log_summarize {
2015 my $self = shift;
2017 my $lockname = 'mgfs:fscksum';
2018 my $lock = eval { $self->get_lock($lockname, 10) };
2019 return 0 if defined $lock && $lock == 0;
2021 my $logid = $self->max_fsck_logid;
2023 # sum-up evcode counts every so often, to make fsck_status faster,
2024 # avoiding a potentially-huge GROUP BY in the future..
2025 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
2026 # both inclusive:
2027 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
2028 $min_logid++;
2029 my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
2030 while (my ($evcode, $ct) = each %$cts) {
2031 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
2033 $self->set_server_setting("fsck_logid_processed", $logid);
2035 $self->release_lock($lockname) if $lock;
2038 sub fsck_log {
2039 my ($self, %opts) = @_;
2040 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
2041 "VALUES (" . $self->unix_timestamp . ",?,?,?)",
2042 undef,
2043 delete $opts{fid},
2044 delete $opts{code},
2045 delete $opts{devid});
2046 croak("Unknown opts") if %opts;
2047 $self->condthrow;
2049 return 1;
2052 sub get_db_unixtime {
2053 my $self = shift;
2054 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
2057 sub max_fidid {
2058 my $self = shift;
2059 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
2062 sub max_fsck_logid {
2063 my $self = shift;
2064 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
2067 # returns array of $row hashrefs, from fsck_log table
2068 sub fsck_log_rows {
2069 my ($self, $after_logid, $limit) = @_;
2070 $limit = int($limit || 100);
2071 $after_logid = int($after_logid || 0);
2073 my @rows;
2074 my $sth = $self->dbh->prepare(qq{
2075 SELECT logid, utime, fid, evcode, devid
2076 FROM fsck_log
2077 WHERE logid > ?
2078 ORDER BY logid
2079 LIMIT $limit
2081 $sth->execute($after_logid);
2082 my $row;
2083 push @rows, $row while $row = $sth->fetchrow_hashref;
2084 return @rows;
2087 sub fsck_evcode_counts {
2088 my ($self, %opts) = @_;
2089 my $timegte = delete $opts{time_gte};
2090 my $logr = delete $opts{logid_range};
2091 die if %opts;
2093 my $ret = {};
2094 my $sth;
2095 if ($timegte) {
2096 $sth = $self->dbh->prepare(qq{
2097 SELECT evcode, COUNT(*) FROM fsck_log
2098 WHERE utime >= ?
2099 GROUP BY evcode
2101 $sth->execute($timegte||0);
2103 if ($logr) {
2104 $sth = $self->dbh->prepare(qq{
2105 SELECT evcode, COUNT(*) FROM fsck_log
2106 WHERE logid >= ? AND logid <= ?
2107 GROUP BY evcode
2109 $sth->execute($logr->[0], $logr->[1]);
2111 while (my ($ev, $ct) = $sth->fetchrow_array) {
2112 $ret->{$ev} = $ct;
2114 return $ret;
2117 # run before daemonizing. you can die from here if you see something's amiss. or emit
2118 # warnings.
2119 sub pre_daemonize_checks {
2120 my $self = shift;
2122 $self->pre_daemonize_check_slaves;
2125 sub pre_daemonize_check_slaves {
2126 my $sk = MogileFS::Config->server_setting('slave_keys')
2127 or return;
2129 my @slaves;
2130 foreach my $key (split /\s*,\s*/, $sk) {
2131 my $slave = MogileFS::Config->server_setting("slave_$key");
2133 if (!$slave) {
2134 error("key for slave DB config: slave_$key not found in configuration");
2135 next;
2138 my ($dsn, $user, $pass) = split /\|/, $slave;
2139 if (!defined($dsn) or !defined($user) or !defined($pass)) {
2140 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
2141 next;
2143 push @slaves, [$dsn, $user, $pass]
2146 return unless @slaves; # Escape this block if we don't have a set of slaves anyways
2148 MogileFS::run_global_hook('slave_list_check', \@slaves);
2152 # attempt to grab a lock of lockname, and timeout after timeout seconds.
2153 # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
2154 sub get_lock {
2155 my ($self, $lockname, $timeout) = @_;
2156 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
2157 die "get_lock not implemented for $self";
2160 # attempt to release a lock of lockname.
2161 # returns 1 on success and 0 if no lock we have has that name.
2162 sub release_lock {
2163 my ($self, $lockname) = @_;
2164 die "release_lock not implemented for $self";
2167 # MySQL has an issue where you either get excessive deadlocks, or INSERT's
2168 # hang forever around some transactions. Use ghetto locking to cope.
2169 sub lock_queue { 1 }
2170 sub unlock_queue { 1 }
2172 sub BLOB_BIND_TYPE { undef; }
2174 sub set_checksum {
2175 my ($self, $fidid, $hashtype, $checksum) = @_;
2176 my $dbh = $self->dbh;
2177 die "Your database does not support REPLACE! Reimplement set_checksum!" unless $self->can_replace;
2179 eval {
2180 my $sth = $dbh->prepare("REPLACE INTO checksum " .
2181 "(fid, hashtype, checksum) " .
2182 "VALUES (?, ?, ?)");
2183 $sth->bind_param(1, $fidid);
2184 $sth->bind_param(2, $hashtype);
2185 $sth->bind_param(3, $checksum, BLOB_BIND_TYPE);
2186 $sth->execute;
2188 $self->condthrow;
2191 sub get_checksum {
2192 my ($self, $fidid) = @_;
2194 $self->dbh->selectrow_hashref("SELECT fid, hashtype, checksum " .
2195 "FROM checksum WHERE fid = ?",
2196 undef, $fidid);
2199 sub delete_checksum {
2200 my ($self, $fidid) = @_;
2202 $self->dbh->do("DELETE FROM checksum WHERE fid = ?", undef, $fidid);
2205 # setup the value used in a 'nexttry' field to indicate that this item will
2206 # never actually be tried again and require some sort of manual intervention.
2207 use constant ENDOFTIME => 2147483647;
2209 sub end_of_time { ENDOFTIME; }
2211 # returns the size of the non-urgent replication queue
2212 # nexttry == 0 - the file is urgent
2213 # nexttry != 0 && nexttry < ENDOFTIME - the file is deferred
2214 sub deferred_repl_queue_length {
2215 my ($self) = @_;
2217 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file_to_replicate WHERE nexttry != 0 AND nexttry < ?', undef, $self->end_of_time);
2222 __END__
2224 =head1 NAME
2226 MogileFS::Store - data storage provider. base class.
2228 =head1 ABOUT
2230 MogileFS aims to be database-independent (though currently as of late
2231 2006 only works with MySQL). In the future, the server will create a
2232 singleton instance of type "MogileFS::Store", like
2233 L<MogileFS::Store::MySQL>, and all database interaction will be
2234 through it.
2236 =head1 SEE ALSO
2238 L<MogileFS::Store::MySQL>