Checking in changes prior to tagging of version 2.66.
[MogileFS-Server.git] / lib / MogileFS / Store.pm
blob1ac1537c009a46333a6f15b77f2e306fede152d9
1 package MogileFS::Store;
2 use strict;
3 use warnings;
4 use Carp qw(croak confess);
5 use MogileFS::Util qw(throw max error);
6 use DBI; # no reason a Store has to be DBI-based, but for now they all are.
7 use List::Util qw(shuffle);
9 # this is incremented whenever the schema changes. server will refuse
10 # to start-up with an old schema version
12 # 6: adds file_to_replicate table
13 # 7: adds file_to_delete_later table
14 # 8: adds fsck_log table
15 # 9: adds 'drain' state to enum in device table
16 # 10: adds 'replpolicy' column to 'class' table
17 # 11: adds 'file_to_queue' table
18 # 12: adds 'file_to_delete2' table
19 # 13: modifies 'server_settings.value' to TEXT for wider values
20 # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21 # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22 # 15: adds checksum table, adds 'hashtype' column to 'class' table
23 use constant SCHEMA_VERSION => 15;
25 sub new {
26 my ($class) = @_;
27 return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass max_handles));
30 sub new_from_dsn_user_pass {
31 my ($class, $dsn, $user, $pass, $max_handles) = @_;
32 my $subclass;
33 if ($dsn =~ /^DBI:mysql:/i) {
34 $subclass = "MogileFS::Store::MySQL";
35 } elsif ($dsn =~ /^DBI:SQLite:/i) {
36 $subclass = "MogileFS::Store::SQLite";
37 } elsif ($dsn =~ /^DBI:Oracle:/i) {
38 $subclass = "MogileFS::Store::Oracle";
39 } elsif ($dsn =~ /^DBI:Pg:/i) {
40 $subclass = "MogileFS::Store::Postgres";
41 } else {
42 die "Unknown database type: $dsn";
44 unless (eval "use $subclass; 1") {
45 die "Error loading $subclass: $@\n";
47 my $self = bless {
48 dsn => $dsn,
49 user => $user,
50 pass => $pass,
51 max_handles => $max_handles, # Max number of handles to allow
52 raise_errors => $subclass->want_raise_errors,
53 slave_list_version => 0,
54 slave_list_cache => [],
55 recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested
56 recheck_done_gen => 0, # once recheck is done, copy of what the request generation was
57 handles_left => 0, # amount of times this handle can still be verified
58 connected_slaves => {},
59 dead_slaves => {},
60 dead_backoff => {}, # how many times in a row a slave has died
61 connect_timeout => 10, # High default.
62 }, $subclass;
63 $self->init;
64 return $self;
67 # Defaults to true now.
68 sub want_raise_errors {
72 sub new_from_mogdbsetup {
73 my ($class, %args) = @_;
74 # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
75 my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}, $args{dbport});
77 my $try_make_sto = sub {
78 my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
79 PrintError => 0,
80 }) or return undef;
81 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
82 $sto->raise_errors;
83 return $sto;
86 # upgrading, apparently, as this database already exists.
87 my $sto = $try_make_sto->();
88 return $sto if $sto;
90 # otherwise, we need to make the requested database, setup permissions, etc
91 $class->status("couldn't connect to database as mogilefs user. trying root...");
92 my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}, $args{dbport});
93 my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
94 PrintError => 0,
95 }) or
96 die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI->errstr . "\n";
97 $class->status("connected to database as root user.");
99 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
100 $class->create_db_if_not_exists($rdbh, $args{dbname});
101 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
102 $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
104 # should be ready now:
105 $sto = $try_make_sto->();
106 return $sto if $sto;
108 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
111 # given a root DBI connection, create the named database. succeed
112 # if it it's made, or already exists. die otherwise.
113 sub create_db_if_not_exists {
114 my ($pkg, $rdbh, $dbname) = @_;
115 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
116 or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
119 sub grant_privileges {
120 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
121 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
122 undef, $pass)
123 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
124 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
125 undef, $pass)
126 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
129 sub can_replace { 0 }
130 sub can_insertignore { 0 }
131 sub can_insert_multi { 0 }
132 sub can_for_update { 1 }
134 sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
136 sub ignore_replace {
137 my $self = shift;
138 return "INSERT IGNORE " if $self->can_insertignore;
139 return "REPLACE " if $self->can_replace;
140 die "Can't INSERT IGNORE or REPLACE?";
143 my $on_status = sub {};
144 my $on_confirm = sub { 1 };
145 sub on_status { my ($pkg, $code) = @_; $on_status = $code; };
146 sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
147 sub status { my ($pkg, $msg) = @_; $on_status->($msg); };
148 sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
150 sub latest_schema_version { SCHEMA_VERSION }
152 sub raise_errors {
153 my $self = shift;
154 $self->{raise_errors} = 1;
155 $self->dbh->{RaiseError} = 1;
158 sub set_connect_timeout { $_[0]{connect_timeout} = $_[1]; }
160 sub dsn { $_[0]{dsn} }
161 sub user { $_[0]{user} }
162 sub pass { $_[0]{pass} }
164 sub connect_timeout { $_[0]{connect_timeout} }
166 sub init { 1 }
167 sub post_dbi_connect { 1 }
169 sub can_do_slaves { 0 }
171 sub mark_as_slave {
172 my $self = shift;
173 die "Incapable of becoming slave." unless $self->can_do_slaves;
175 $self->{is_slave} = 1;
178 sub is_slave {
179 my $self = shift;
180 return $self->{is_slave};
183 sub _slaves_list_changed {
184 my $self = shift;
185 my $ver = MogileFS::Config->server_setting_cached('slave_version') || 0;
186 if ($ver <= $self->{slave_list_version}) {
187 return 0;
189 $self->{slave_list_version} = $ver;
190 # Restart connections from scratch if the configuration changed.
191 $self->{connected_slaves} = {};
192 return 1;
195 # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
196 sub _slaves_list {
197 my $self = shift;
198 my $now = time();
200 my $sk = MogileFS::Config->server_setting_cached('slave_keys')
201 or return ();
203 my @ret;
204 foreach my $key (split /\s*,\s*/, $sk) {
205 my $slave = MogileFS::Config->server_setting_cached("slave_$key");
207 if (!$slave) {
208 error("key for slave DB config: slave_$key not found in configuration");
209 next;
212 my ($dsn, $user, $pass) = split /\|/, $slave;
213 if (!defined($dsn) or !defined($user) or !defined($pass)) {
214 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
215 next;
217 push @ret, [$dsn, $user, $pass]
220 return @ret;
223 sub _pick_slave {
224 my $self = shift;
225 my @temp = shuffle keys %{$self->{connected_slaves}};
226 return unless @temp;
227 return $self->{connected_slaves}->{$temp[0]};
230 sub _connect_slave {
231 my $self = shift;
232 my $slave_fulldsn = shift;
233 my $now = time();
235 my $dead_retry =
236 MogileFS::Config->server_setting_cached('slave_dead_retry_timeout') || 15;
238 my $dead_backoff = $self->{dead_backoff}->{$slave_fulldsn->[0]} || 0;
239 my $dead_timeout = $self->{dead_slaves}->{$slave_fulldsn->[0]};
240 return if (defined $dead_timeout
241 && $dead_timeout + ($dead_retry * $dead_backoff) > $now);
242 return if ($self->{connected_slaves}->{$slave_fulldsn->[0]});
244 my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
245 $newslave->set_connect_timeout(
246 MogileFS::Config->server_setting_cached('slave_connect_timeout') || 1);
247 $self->{slave}->{next_check} = 0;
248 $newslave->mark_as_slave;
249 if ($self->check_slave) {
250 $self->{connected_slaves}->{$slave_fulldsn->[0]} = $newslave;
251 $self->{dead_backoff}->{$slave_fulldsn->[0]} = 0;
252 } else {
253 # Magic numbers are saddening...
254 $dead_backoff++ unless $dead_backoff > 20;
255 $self->{dead_slaves}->{$slave_fulldsn->[0]} = $now;
256 $self->{dead_backoff}->{$slave_fulldsn->[0]} = $dead_backoff;
260 sub get_slave {
261 my $self = shift;
263 die "Incapable of having slaves." unless $self->can_do_slaves;
265 $self->{slave} = undef;
266 foreach my $slave (keys %{$self->{dead_slaves}}) {
267 my ($full_dsn) = grep { $slave eq $_->[0] } @{$self->{slave_list_cache}};
268 unless ($full_dsn) {
269 delete $self->{dead_slaves}->{$slave};
270 next;
272 $self->_connect_slave($full_dsn);
275 unless ($self->_slaves_list_changed) {
276 if ($self->{slave} = $self->_pick_slave) {
277 $self->{slave}->{recheck_req_gen} = $self->{recheck_req_gen};
278 return $self->{slave} if $self->check_slave;
282 if ($self->{slave}) {
283 my $dsn = $self->{slave}->{dsn};
284 $self->{dead_slaves}->{$dsn} = time();
285 $self->{dead_backoff}->{$dsn} = 0;
286 delete $self->{connected_slaves}->{$dsn};
287 error("Error talking to slave: $dsn");
289 my @slaves_list = $self->_slaves_list;
291 # If we have no slaves, then return silently.
292 return unless @slaves_list;
294 my $slave_skip_filtering = MogileFS::Config->server_setting_cached('slave_skip_filtering');
296 unless (defined $slave_skip_filtering && $slave_skip_filtering eq 'on') {
297 MogileFS::run_global_hook('slave_list_filter', \@slaves_list);
300 $self->{slave_list_cache} = \@slaves_list;
302 foreach my $slave_fulldsn (@slaves_list) {
303 $self->_connect_slave($slave_fulldsn);
306 if ($self->{slave} = $self->_pick_slave) {
307 return $self->{slave};
309 warn "Slave list exhausted, failing back to master.";
310 return;
313 sub read_store {
314 my $self = shift;
316 return $self unless $self->can_do_slaves;
318 if ($self->{slave_ok}) {
319 if (my $slave = $self->get_slave) {
320 return $slave;
324 return $self;
327 sub slaves_ok {
328 my $self = shift;
329 my $coderef = shift;
331 return unless ref $coderef eq 'CODE';
333 local $self->{slave_ok} = 1;
335 return $coderef->(@_);
338 sub recheck_dbh {
339 my $self = shift;
340 $self->{recheck_req_gen}++;
343 sub dbh {
344 my $self = shift;
346 if ($self->{dbh}) {
347 if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
348 $self->{dbh} = undef unless $self->{dbh}->ping;
349 # Handles a memory leak under Solaris/Postgres.
350 $self->{dbh} = undef if ($self->{max_handles} &&
351 $self->{handles_left}-- < 0);
352 $self->{recheck_done_gen} = $self->{recheck_req_gen};
354 return $self->{dbh} if $self->{dbh};
357 # Shortcut flag: if monitor thinks the master is down, avoid attempting to
358 # connect to it for now. If we already have a connection to the master,
359 # keep using it as above.
360 if (!$self->is_slave) {
361 my $flag = MogileFS::Config->server_setting_cached('_master_db_alive', 0);
362 return if (defined $flag && $flag == 0);;
365 eval {
366 local $SIG{ALRM} = sub { die "timeout\n" };
367 alarm($self->connect_timeout);
368 $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
369 PrintError => 0,
370 AutoCommit => 1,
371 # FUTURE: will default to on (have to validate all callers first):
372 RaiseError => ($self->{raise_errors} || 0),
375 alarm(0);
376 if ($@ eq "timeout\n") {
377 die "Failed to connect to database: timeout";
378 } elsif ($@) {
379 die "Failed to connect to database: " . DBI->errstr;
381 $self->post_dbi_connect;
382 $self->{handles_left} = $self->{max_handles} if $self->{max_handles};
383 return $self->{dbh};
386 sub have_dbh { return 1 if $_[0]->{dbh}; }
388 sub ping {
389 my $self = shift;
390 return $self->dbh->ping;
393 sub condthrow {
394 my ($self, $optmsg) = @_;
395 my $dbh = $self->dbh;
396 return 1 unless $dbh->err;
397 my ($pkg, $fn, $line) = caller;
398 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
399 $msg .= ": $optmsg" if $optmsg;
400 # Auto rollback failures around transactions.
401 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
402 croak($msg);
405 sub dowell {
406 my ($self, $sql, @do_params) = @_;
407 my $rv = eval { $self->dbh->do($sql, @do_params) };
408 return $rv unless $@ || $self->dbh->err;
409 warn "Error with SQL: $sql\n";
410 Carp::confess($@ || $self->dbh->errstr);
413 sub _valid_params {
414 croak("Odd number of parameters!") if scalar(@_) % 2;
415 my ($self, $vlist, %uarg) = @_;
416 my %ret;
417 $ret{$_} = delete $uarg{$_} foreach @$vlist;
418 croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
419 return %ret;
422 sub was_deadlock_error {
423 my $self = shift;
424 my $dbh = $self->dbh;
425 die "UNIMPLEMENTED";
428 sub was_duplicate_error {
429 my $self = shift;
430 my $dbh = $self->dbh;
431 die "UNIMPLEMENTED";
434 # run a subref (presumably a database update) in an eval, because you expect it to
435 # maybe fail on duplicate key error, and throw a dup exception for you, else return
436 # its return value
437 sub conddup {
438 my ($self, $code) = @_;
439 my $rv = eval { $code->(); };
440 throw("dup") if $self->was_duplicate_error;
441 croak($@) if $@;
442 return $rv;
445 # insert row if doesn't already exist
446 # WARNING: This function is NOT transaction safe if the duplicate errors causes
447 # your transaction to halt!
448 # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
449 # is false! Rows before the duplicate will be inserted, but rows after the
450 # duplicate might not be, depending your database.
451 sub insert_ignore {
452 my ($self, $sql, @params) = @_;
453 my $dbh = $self->dbh;
454 if ($self->can_insertignore) {
455 return $dbh->do("INSERT IGNORE $sql", @params);
456 } else {
457 # TODO: Detect bad multi-row insert here.
458 my $rv = eval { $dbh->do("INSERT $sql", @params); };
459 if ($@ || $dbh->err) {
460 return 1 if $self->was_duplicate_error;
461 # This chunk is identical to condthrow, but we include it directly
462 # here as we know there is definitely an error, and we would like
463 # the caller of this function.
464 my ($pkg, $fn, $line) = caller;
465 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
466 croak($msg);
468 return $rv;
472 sub retry_on_deadlock {
473 my $self = shift;
474 my $code = shift;
475 my $tries = shift || 3;
476 croak("deadlock retries must be positive") if $tries < 1;
477 my $rv;
479 while ($tries-- > 0) {
480 $rv = eval { $code->(); };
481 next if ($self->was_deadlock_error);
482 croak($@) if $@;
483 last;
485 return $rv;
488 # --------------------------------------------------------------------------
490 my @extra_tables;
492 sub add_extra_tables {
493 my $class = shift;
494 push @extra_tables, @_;
497 use constant TABLES => qw( domain class file tempfile file_to_delete
498 unreachable_fids file_on file_on_corrupt host
499 device server_settings file_to_replicate
500 file_to_delete_later fsck_log file_to_queue
501 file_to_delete2 checksum);
503 sub setup_database {
504 my $sto = shift;
506 my $curver = $sto->schema_version;
508 my $latestver = SCHEMA_VERSION;
509 if ($curver == $latestver) {
510 $sto->status("Schema already up-to-date at version $curver.");
511 return 1;
514 if ($curver > $latestver) {
515 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
518 if ($curver) {
519 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
522 foreach my $t (TABLES, @extra_tables) {
523 $sto->create_table($t);
526 $sto->upgrade_add_host_getport;
527 $sto->upgrade_add_host_altip;
528 $sto->upgrade_add_device_asof;
529 $sto->upgrade_add_device_weight;
530 $sto->upgrade_add_device_readonly;
531 $sto->upgrade_add_device_drain;
532 $sto->upgrade_add_class_replpolicy;
533 $sto->upgrade_modify_server_settings_value;
534 $sto->upgrade_add_file_to_queue_arg;
535 $sto->upgrade_modify_device_size;
536 $sto->upgrade_add_class_hashtype;
538 return 1;
541 sub cached_schema_version {
542 my $self = shift;
543 return $self->{_cached_schema_version} ||=
544 $self->schema_version;
547 sub schema_version {
548 my $self = shift;
549 my $dbh = $self->dbh;
550 return eval {
551 $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
552 } || 0;
555 sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
557 sub create_table {
558 my ($self, $table) = @_;
559 my $dbh = $self->dbh;
560 return 1 if $self->table_exists($table);
561 my $meth = "TABLE_$table";
562 my $sql = $self->$meth;
563 $sql = $self->filter_create_sql($sql);
564 $self->status("Running SQL: $sql;");
565 $dbh->do($sql) or
566 die "Failed to create table $table: " . $dbh->errstr;
567 my $imeth = "INDEXES_$table";
568 my @indexes = eval { $self->$imeth };
569 foreach $sql (@indexes) {
570 $self->status("Running SQL: $sql;");
571 $dbh->do($sql) or
572 die "Failed to create indexes on $table: " . $dbh->errstr;
576 # Please try to keep all tables aligned nicely
577 # with '"CREATE TABLE' on the first line
578 # and ')"' alone on the last line.
580 sub TABLE_domain {
581 # classes are tied to domains. domains can have classes of items
582 # with different mindevcounts.
584 # a minimum devcount is the number of copies the system tries to
585 # maintain for files in that class
587 # unspecified classname means classid=0 (implicit class), and that
588 # implies mindevcount=2
589 "CREATE TABLE domain (
590 dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
591 namespace VARCHAR(255),
592 UNIQUE (namespace)
596 sub TABLE_class {
597 "CREATE TABLE class (
598 dmid SMALLINT UNSIGNED NOT NULL,
599 classid TINYINT UNSIGNED NOT NULL,
600 PRIMARY KEY (dmid,classid),
601 classname VARCHAR(50),
602 UNIQUE (dmid,classname),
603 mindevcount TINYINT UNSIGNED NOT NULL,
604 hashtype TINYINT UNSIGNED
608 # the length field is only here for easy verifications of content
609 # integrity when copying around. no sums or content types or other
610 # metadata here. application can handle that.
612 # classid is what class of file this belongs to. for instance, on fotobilder
613 # there will be a class for original pictures (the ones the user uploaded)
614 # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
615 # each domain can setup classes and assign the minimum redundancy level for
616 # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
617 # photos and and a 1 minimum for derived images (which means the sole device
618 # for a derived image can die, bringing devcount to 0 for that file, but
619 # the application can recreate it from its original)
620 sub TABLE_file {
621 "CREATE TABLE file (
622 fid INT UNSIGNED NOT NULL,
623 PRIMARY KEY (fid),
625 dmid SMALLINT UNSIGNED NOT NULL,
626 dkey VARCHAR(255), # domain-defined
627 UNIQUE dkey (dmid, dkey),
629 length BIGINT UNSIGNED, # big limit
631 classid TINYINT UNSIGNED NOT NULL,
632 devcount TINYINT UNSIGNED NOT NULL,
633 INDEX devcount (dmid,classid,devcount)
637 sub TABLE_tempfile {
638 "CREATE TABLE tempfile (
639 fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
640 PRIMARY KEY (fid),
642 createtime INT UNSIGNED NOT NULL,
643 classid TINYINT UNSIGNED NOT NULL,
644 dmid SMALLINT UNSIGNED NOT NULL,
645 dkey VARCHAR(255),
646 devids VARCHAR(60)
650 # files marked for death when their key is overwritten. then they get a new
651 # fid, but since the old row (with the old fid) had to be deleted immediately,
652 # we need a place to store the fid so an async job can delete the file from
653 # all devices.
654 sub TABLE_file_to_delete {
655 "CREATE TABLE file_to_delete (
656 fid INT UNSIGNED NOT NULL,
657 PRIMARY KEY (fid)
661 # if the replicator notices that a fid has no sources, that file gets inserted
662 # into the unreachable_fids table. it is up to the application to actually
663 # handle fids stored in this table.
664 sub TABLE_unreachable_fids {
665 "CREATE TABLE unreachable_fids (
666 fid INT UNSIGNED NOT NULL,
667 lastupdate INT UNSIGNED NOT NULL,
668 PRIMARY KEY (fid),
669 INDEX (lastupdate)
673 # what files are on what devices? (most likely physical devices,
674 # as logical devices of RAID arrays would be costly, and mogilefs
675 # already handles redundancy)
677 # the devid index lets us answer "What files were on this now-dead disk?"
678 sub TABLE_file_on {
679 "CREATE TABLE file_on (
680 fid INT UNSIGNED NOT NULL,
681 devid MEDIUMINT UNSIGNED NOT NULL,
682 PRIMARY KEY (fid, devid),
683 INDEX (devid)
687 # if application or framework detects an error in one of the duplicate files
688 # for whatever reason, it can register its complaint and the framework
689 # will do some verifications and fix things up w/ an async job
690 # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
691 # on the other devices?
692 sub TABLE_file_on_corrupt {
693 "CREATE TABLE file_on_corrupt (
694 fid INT UNSIGNED NOT NULL,
695 devid MEDIUMINT UNSIGNED NOT NULL,
696 PRIMARY KEY (fid, devid)
700 # hosts (which contain devices...)
701 sub TABLE_host {
702 "CREATE TABLE host (
703 hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
705 status ENUM('alive','dead','down'),
706 http_port MEDIUMINT UNSIGNED DEFAULT 7500,
707 http_get_port MEDIUMINT UNSIGNED,
709 hostname VARCHAR(40),
710 hostip VARCHAR(15),
711 altip VARCHAR(15),
712 altmask VARCHAR(18),
713 UNIQUE (hostname),
714 UNIQUE (hostip),
715 UNIQUE (altip)
719 # disks...
720 sub TABLE_device {
721 "CREATE TABLE device (
722 devid MEDIUMINT UNSIGNED NOT NULL,
723 hostid MEDIUMINT UNSIGNED NOT NULL,
725 status ENUM('alive','dead','down'),
726 weight MEDIUMINT DEFAULT 100,
728 mb_total INT UNSIGNED,
729 mb_used INT UNSIGNED,
730 mb_asof INT UNSIGNED,
731 PRIMARY KEY (devid),
732 INDEX (status)
736 sub TABLE_server_settings {
737 "CREATE TABLE server_settings (
738 field VARCHAR(50) PRIMARY KEY,
739 value TEXT
743 sub TABLE_file_to_replicate {
744 # nexttry is time to try to replicate it next.
745 # 0 means immediate. it's only on one host.
746 # 1 means lower priority. it's on 2+ but isn't happy where it's at.
747 # unix timestamp means at/after that time. some previous error occurred.
748 # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
749 # failcount. how many times we've failed, just for doing backoff of nexttry.
750 # flags. reserved for future use.
751 "CREATE TABLE file_to_replicate (
752 fid INT UNSIGNED NOT NULL PRIMARY KEY,
753 nexttry INT UNSIGNED NOT NULL,
754 INDEX (nexttry),
755 fromdevid INT UNSIGNED,
756 failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
757 flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
761 sub TABLE_file_to_delete_later {
762 "CREATE TABLE file_to_delete_later (
763 fid INT UNSIGNED NOT NULL PRIMARY KEY,
764 delafter INT UNSIGNED NOT NULL,
765 INDEX (delafter)
769 sub TABLE_fsck_log {
770 "CREATE TABLE fsck_log (
771 logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
772 PRIMARY KEY (logid),
773 utime INT UNSIGNED NOT NULL,
774 fid INT UNSIGNED NULL,
775 evcode CHAR(4),
776 devid MEDIUMINT UNSIGNED,
777 INDEX(utime)
781 # generic queue table, designed to be used for workers/jobs which aren't
782 # constantly in use, and are async to the user.
783 # ie; fsck, drain, rebalance.
784 sub TABLE_file_to_queue {
785 "CREATE TABLE file_to_queue (
786 fid INT UNSIGNED NOT NULL,
787 devid INT UNSIGNED,
788 type TINYINT UNSIGNED NOT NULL,
789 nexttry INT UNSIGNED NOT NULL,
790 failcount TINYINT UNSIGNED NOT NULL default '0',
791 flags SMALLINT UNSIGNED NOT NULL default '0',
792 arg TEXT,
793 PRIMARY KEY (fid, type),
794 INDEX type_nexttry (type,nexttry)
798 # new style async delete table.
799 # this is separate from file_to_queue since deletes are more actively used,
800 # and partitioning on 'type' doesn't always work so well.
801 sub TABLE_file_to_delete2 {
802 "CREATE TABLE file_to_delete2 (
803 fid INT UNSIGNED NOT NULL PRIMARY KEY,
804 nexttry INT UNSIGNED NOT NULL,
805 failcount TINYINT UNSIGNED NOT NULL default '0',
806 INDEX nexttry (nexttry)
810 sub TABLE_checksum {
811 "CREATE TABLE checksum (
812 fid INT UNSIGNED NOT NULL PRIMARY KEY,
813 hashtype TINYINT UNSIGNED NOT NULL,
814 checksum VARBINARY(64) NOT NULL
818 # these five only necessary for MySQL, since no other database existed
819 # before, so they can just create the tables correctly to begin with.
820 # in the future, there might be new alters that non-MySQL databases
821 # will have to implement.
822 sub upgrade_add_host_getport { 1 }
823 sub upgrade_add_host_altip { 1 }
824 sub upgrade_add_device_asof { 1 }
825 sub upgrade_add_device_weight { 1 }
826 sub upgrade_add_device_readonly { 1 }
827 sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
828 sub upgrade_modify_server_settings_value { die "Not implemented in $_[0]" }
829 sub upgrade_add_file_to_queue_arg { die "Not implemented in $_[0]" }
830 sub upgrade_modify_device_size { die "Not implemented in $_[0]" }
832 sub upgrade_add_class_replpolicy {
833 my ($self) = @_;
834 unless ($self->column_type("class", "replpolicy")) {
835 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
839 sub upgrade_add_class_hashtype {
840 my ($self) = @_;
841 unless ($self->column_type("class", "hashtype")) {
842 $self->dowell("ALTER TABLE class ADD COLUMN hashtype TINYINT UNSIGNED");
846 # return true if deleted, 0 if didn't exist, exception if error
847 sub delete_host {
848 my ($self, $hostid) = @_;
849 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
852 # return true if deleted, 0 if didn't exist, exception if error
853 sub delete_domain {
854 my ($self, $dmid) = @_;
855 throw("has_files") if $self->domain_has_files($dmid);
856 throw("has_classes") if $self->domain_has_classes($dmid);
857 return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
860 sub domain_has_files {
861 my ($self, $dmid) = @_;
862 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
863 undef, $dmid);
864 return $has_a_fid ? 1 : 0;
867 sub domain_has_classes {
868 my ($self, $dmid) = @_;
869 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? LIMIT 1',
870 undef, $dmid);
871 return $has_a_class ? 1 : 0;
874 sub class_has_files {
875 my ($self, $dmid, $clid) = @_;
876 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
877 undef, $dmid, $clid);
878 return $has_a_fid ? 1 : 0;
881 # return new classid on success (non-zero integer), die on failure
882 # throw 'dup' on duplicate name
883 # override this if you want a less racy version.
884 sub create_class {
885 my ($self, $dmid, $classname) = @_;
886 my $dbh = $self->dbh;
888 # get the max class id in this domain
889 my $maxid = $dbh->selectrow_array
890 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
892 my $clsid = $maxid + 1;
893 if ($classname eq 'default') {
894 $clsid = 0;
897 # now insert the new class
898 my $rv = eval {
899 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
900 undef, $dmid, $clsid, $classname, 2);
902 if ($@ || $dbh->err) {
903 if ($self->was_duplicate_error) {
904 throw("dup");
907 return $clsid if $rv;
908 $self->condthrow;
909 die;
912 # return 1 on success, throw "dup" on duplicate name error, die otherwise
913 sub update_class_name {
914 my $self = shift;
915 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
916 my $rv = eval {
917 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
918 undef, $arg{classname}, $arg{dmid}, $arg{classid});
920 throw("dup") if $self->was_duplicate_error;
921 $self->condthrow;
922 return 1;
925 # return 1 on success, die otherwise
926 sub update_class_mindevcount {
927 my $self = shift;
928 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
929 eval {
930 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
931 undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
933 $self->condthrow;
934 return 1;
937 # return 1 on success, die otherwise
938 sub update_class_replpolicy {
939 my $self = shift;
940 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
941 eval {
942 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
943 undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
945 $self->condthrow;
946 return 1;
949 # return 1 on success, die otherwise
950 sub update_class_hashtype {
951 my $self = shift;
952 my %arg = $self->_valid_params([qw(dmid classid hashtype)], @_);
953 eval {
954 $self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?",
955 undef, $arg{hashtype}, $arg{dmid}, $arg{classid});
957 $self->condthrow;
960 sub nfiles_with_dmid_classid_devcount {
961 my ($self, $dmid, $classid, $devcount) = @_;
962 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
963 undef, $dmid, $classid, $devcount);
966 sub set_server_setting {
967 my ($self, $key, $val) = @_;
968 my $dbh = $self->dbh;
969 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
971 eval {
972 if (defined $val) {
973 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
974 } else {
975 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
979 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
980 return 1;
983 # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
984 sub incr_server_setting {
985 my ($self, $key, $val) = @_;
986 $val = 1 unless defined $val;
987 return unless $val;
989 return 1 if $self->dbh->do("UPDATE server_settings ".
990 "SET value=value+? ".
991 "WHERE field=?", undef,
992 $val, $key) > 0;
993 $self->set_server_setting($key, $val);
996 sub server_setting {
997 my ($self, $key) = @_;
998 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
999 undef, $key);
1002 sub server_settings {
1003 my ($self) = @_;
1004 my $ret = {};
1005 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
1006 $sth->execute;
1007 while (my ($k, $v) = $sth->fetchrow_array) {
1008 $ret->{$k} = $v;
1010 return $ret;
1013 # register a tempfile and return the fidid, which should be allocated
1014 # using autoincrement/sequences if the passed in fid is undef. however,
1015 # if fid is passed in, that value should be used and returned.
1017 # return new/passed in fidid on success.
1018 # throw 'dup' if fid already in use
1019 # return 0/undef/die on failure
1021 sub register_tempfile {
1022 my $self = shift;
1023 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
1025 my $dbh = $self->dbh;
1026 my $fid = $arg{fid};
1028 my $explicit_fid_used = $fid ? 1 : 0;
1030 # setup the new mapping. we store the devices that we picked for
1031 # this file in here, knowing that they might not be used. create_close
1032 # is responsible for actually mapping in file_on. NOTE: fid is being
1033 # passed in, it's either some number they gave us, or it's going to be
1034 # 0/undef which translates into NULL which means to automatically create
1035 # one. that should be fine.
1036 my $ins_tempfile = sub {
1037 my $rv = eval {
1038 # We must only pass the correct number of bind parameters
1039 # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
1040 # Postgres, where you are expected to leave it out or use DEFAULT
1041 # Leaving it out seems sanest and least likely to cause problems
1042 # with other databases.
1043 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
1044 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
1045 my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
1046 # Do not check for $explicit_fid_used, but rather $fid directly
1047 # as this anonymous sub is called from the loop later
1048 if($fid) {
1049 unshift @keys, 'fid';
1050 unshift @vars, '?';
1051 unshift @vals, $fid;
1053 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
1054 $dbh->do($sql, undef, @vals);
1056 if (!$rv) {
1057 return undef if $self->was_duplicate_error;
1058 die "Unexpected db error into tempfile: " . $dbh->errstr;
1061 unless (defined $fid) {
1062 # if they did not give us a fid, then we want to grab the one that was
1063 # theoretically automatically generated
1064 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
1065 or die "No last_insert_id found";
1067 return undef unless defined $fid && $fid > 0;
1068 return 1;
1071 unless ($ins_tempfile->()) {
1072 throw("dup") if $explicit_fid_used;
1073 die "tempfile insert failed";
1076 my $fid_in_use = sub {
1077 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
1078 return $exists ? 1 : 0;
1081 # See notes in MogileFS::Config->check_database
1082 my $min_fidid = MogileFS::Config->config('min_fidid');
1084 # if the fid is in use, do something
1085 while ($fid_in_use->($fid) || $fid <= $min_fidid) {
1086 throw("dup") if $explicit_fid_used;
1088 # be careful of databases which reset their
1089 # auto-increment/sequences when the table is empty (InnoDB
1090 # did/does this, for instance). So check if it's in use, and
1091 # re-seed the table with the highest known fid from the file
1092 # table.
1094 # get the highest fid from the filetable and insert a dummy row
1095 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
1096 $ins_tempfile->(); # don't care about its result
1098 # then do a normal auto-increment
1099 $fid = undef;
1100 $ins_tempfile->() or die "register_tempfile failed after seeding";
1103 return $fid;
1106 # return hashref of row containing columns "fid, dmid, dkey, length,
1107 # classid, devcount" provided a $dmid and $key (dkey). or undef if no
1108 # row.
1109 sub file_row_from_dmid_key {
1110 my ($self, $dmid, $key) = @_;
1111 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1112 "FROM file WHERE dmid=? AND dkey=?",
1113 undef, $dmid, $key);
1116 # return hashref of row containing columns "fid, dmid, dkey, length,
1117 # classid, devcount" provided a $fidid or undef if no row.
1118 sub file_row_from_fidid {
1119 my ($self, $fidid) = @_;
1120 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1121 "FROM file WHERE fid=?",
1122 undef, $fidid);
1125 # return an arrayref of rows containing columns "fid, dmid, dkey, length,
1126 # classid, devcount" provided a pair of $fidid or undef if no rows.
1127 sub file_row_from_fidid_range {
1128 my ($self, $fromfid, $count) = @_;
1129 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1130 "FROM file WHERE fid > ? LIMIT ?");
1131 $sth->execute($fromfid,$count);
1132 return $sth->fetchall_arrayref({});
1135 # return array of devids that a fidid is on
1136 sub fid_devids {
1137 my ($self, $fidid) = @_;
1138 return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
1139 undef, $fidid) || [] };
1142 # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1143 sub fid_devids_multiple {
1144 my ($self, @fidids) = @_;
1145 my $in = join(",", map { $_+0 } @fidids);
1146 my $ret = {};
1147 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1148 $sth->execute;
1149 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1150 push @{$ret->{$fidid} ||= []}, $devid;
1152 return $ret;
1155 # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1156 sub tempfile_row_from_fid {
1157 my ($self, $fidid) = @_;
1158 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1159 "FROM tempfile WHERE fid=?",
1160 undef, $fidid);
1163 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1164 sub create_device {
1165 my ($self, $devid, $hostid, $status) = @_;
1166 my $rv = $self->conddup(sub {
1167 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1168 $devid, $hostid, $status);
1170 $self->condthrow;
1171 die "error making device $devid\n" unless $rv > 0;
1172 return 1;
1175 sub update_device {
1176 my ($self, $devid, $to_update) = @_;
1177 my @keys = sort keys %$to_update;
1178 return unless @keys;
1179 $self->conddup(sub {
1180 $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1181 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
1182 $devid);
1184 return 1;
1187 sub update_device_usage {
1188 my $self = shift;
1189 my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
1190 eval {
1191 $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
1192 " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid});
1194 $self->condthrow;
1197 # This is unimplemented at the moment as we must verify:
1198 # - no file_on rows exist
1199 # - nothing in file_to_queue is going to attempt to use it
1200 # - nothing in file_to_replicate is going to attempt to use it
1201 # - it's already been marked dead
1202 # - that all trackers are likely to know this :/
1203 # - ensure the devid can't be reused
1204 # IE; the user can't mark it dead then remove it all at once and cause their
1205 # cluster to implode.
1206 sub delete_device {
1207 die "Unimplemented; needs further testing";
1210 sub mark_fidid_unreachable {
1211 my ($self, $fidid) = @_;
1212 die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
1213 $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
1214 undef, $fidid);
1217 sub set_device_weight {
1218 my ($self, $devid, $weight) = @_;
1219 eval {
1220 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1222 $self->condthrow;
1225 sub set_device_state {
1226 my ($self, $devid, $state) = @_;
1227 eval {
1228 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1230 $self->condthrow;
1233 sub delete_class {
1234 my ($self, $dmid, $cid) = @_;
1235 throw("has_files") if $self->class_has_files($dmid, $cid);
1236 eval {
1237 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1239 $self->condthrow;
1242 sub delete_fidid {
1243 my ($self, $fidid) = @_;
1244 eval { $self->delete_checksum($fidid); };
1245 $self->condthrow;
1246 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
1247 $self->condthrow;
1248 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1249 $self->condthrow;
1250 $self->enqueue_for_delete2($fidid, 0);
1251 $self->condthrow;
1254 sub delete_tempfile_row {
1255 my ($self, $fidid) = @_;
1256 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1257 $self->condthrow;
1258 return $rv;
1261 # Load the specified tempfile, then delete it. If we succeed, we were
1262 # here first; otherwise, someone else beat us here (and we return undef)
1263 sub delete_and_return_tempfile_row {
1264 my ($self, $fidid) = @_;
1265 my $rv = $self->tempfile_row_from_fid($fidid);
1266 my $rows_deleted = $self->delete_tempfile_row($fidid);
1267 return $rv if ($rows_deleted > 0);
1270 sub replace_into_file {
1271 my $self = shift;
1272 my %arg = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
1273 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1274 eval {
1275 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1276 "VALUES (?,?,?,?,?,?) ", undef,
1277 @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'});
1279 $self->condthrow;
1282 # returns 1 on success, 0 on duplicate key error, dies on exception
1283 # TODO: need a test to hit the duplicate name error condition
1284 # TODO: switch to using "dup" exception here?
1285 sub rename_file {
1286 my ($self, $fidid, $to_key) = @_;
1287 my $dbh = $self->dbh;
1288 eval {
1289 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1290 undef, $to_key, $fidid);
1292 if ($@ || $dbh->err) {
1293 # first is MySQL's error code for duplicates
1294 if ($self->was_duplicate_error) {
1295 return 0;
1296 } else {
1297 die $@;
1300 $self->condthrow;
1301 return 1;
1304 sub get_domainid_by_name {
1305 my $self = shift;
1306 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1307 undef, $_[0]);
1308 return $dmid;
1311 # returns a hash of domains. Key is namespace, value is dmid.
1312 sub get_all_domains {
1313 my ($self) = @_;
1314 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1315 return map { ($_->[0], $_->[1]) } @{$domains || []};
1318 sub get_classid_by_name {
1319 my $self = shift;
1320 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1321 undef, $_[0], $_[1]);
1322 return $classid;
1325 # returns an array of hashrefs, one hashref per row in the 'class' table
1326 sub get_all_classes {
1327 my ($self) = @_;
1328 my (@ret, $row);
1330 my @cols = qw/dmid classid classname mindevcount/;
1331 if ($self->cached_schema_version >= 10) {
1332 push @cols, 'replpolicy';
1333 if ($self->cached_schema_version >= 15) {
1334 push @cols, 'hashtype';
1337 my $cols = join(', ', @cols);
1338 my $sth = $self->dbh->prepare("SELECT $cols FROM class");
1339 $sth->execute;
1340 push @ret, $row while $row = $sth->fetchrow_hashref;
1341 return @ret;
1344 # add a record of fidid existing on devid
1345 # returns 1 on success, 0 on duplicate
1346 sub add_fidid_to_devid {
1347 my ($self, $fidid, $devid) = @_;
1348 croak("fidid not non-zero") unless $fidid;
1349 croak("devid not non-zero") unless $devid;
1351 # TODO: This should possibly be insert_ignore instead
1352 # As if we are adding an extra file_on entry, we do not want to replace the
1353 # exist one. Check REPLACE semantics.
1354 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1355 undef, $fidid, $devid);
1356 return 1 if $rv > 0;
1357 return 0;
1360 # remove a record of fidid existing on devid
1361 # returns 1 on success, 0 if not there anyway
1362 sub remove_fidid_from_devid {
1363 my ($self, $fidid, $devid) = @_;
1364 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1365 undef, $fidid, $devid); };
1366 $self->condthrow;
1367 return $rv;
1370 # Test if host exists.
1371 sub get_hostid_by_id {
1372 my $self = shift;
1373 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1374 undef, $_[0]);
1375 return $hostid;
1378 sub get_hostid_by_name {
1379 my $self = shift;
1380 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1381 undef, $_[0]);
1382 return $hostid;
1385 # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1386 sub get_all_hosts {
1387 my ($self) = @_;
1388 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1389 "hostip, http_port, http_get_port, altip, altmask FROM host");
1390 $sth->execute;
1391 my @ret;
1392 while (my $row = $sth->fetchrow_hashref) {
1393 push @ret, $row;
1395 return @ret;
1398 # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1399 sub get_all_devices {
1400 my ($self) = @_;
1401 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1402 "mb_used, mb_asof, status, weight FROM device");
1403 $self->condthrow;
1404 $sth->execute;
1405 my @return;
1406 while (my $row = $sth->fetchrow_hashref) {
1407 push @return, $row;
1409 return @return;
1412 # update the device count for a given fidid
1413 sub update_devcount {
1414 my ($self, $fidid) = @_;
1415 my $dbh = $self->dbh;
1416 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1417 undef, $fidid);
1419 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1420 $ct, $fidid); };
1421 $self->condthrow;
1423 return 1;
1426 # update the classid for a given fidid
1427 sub update_classid {
1428 my ($self, $fidid, $classid) = @_;
1429 my $dbh = $self->dbh;
1431 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1432 $classid, $fidid);
1434 $self->condthrow;
1435 return 1;
1438 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1439 sub enqueue_for_replication {
1440 my ($self, $fidid, $from_devid, $in) = @_;
1442 my $nexttry = 0;
1443 if ($in) {
1444 $nexttry = $self->unix_timestamp . " + " . int($in);
1447 $self->retry_on_deadlock(sub {
1448 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1449 "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1453 # enqueue a fidid for delete
1454 # note: if we get one more "independent" queue like this, the
1455 # code should be collapsable? I tried once and it looked too ugly, so we have
1456 # some redundancy.
1457 sub enqueue_for_delete2 {
1458 my ($self, $fidid, $in) = @_;
1460 $in = 0 unless $in;
1461 my $nexttry = $self->unix_timestamp . " + " . int($in);
1463 $self->retry_on_deadlock(sub {
1464 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1465 "VALUES (?,$nexttry)", undef, $fidid);
1469 # enqueue a fidid for work
1470 sub enqueue_for_todo {
1471 my ($self, $fidid, $type, $in) = @_;
1473 $in = 0 unless $in;
1474 my $nexttry = $self->unix_timestamp . " + " . int($in);
1476 $self->retry_on_deadlock(sub {
1477 if (ref($fidid)) {
1478 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1479 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1480 $fidid->[0], $fidid->[1], $fidid->[2], $type);
1481 } else {
1482 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1483 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1488 # return 1 on success. die otherwise.
1489 sub enqueue_many_for_todo {
1490 my ($self, $fidids, $type, $in) = @_;
1491 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1492 $self->enqueue_for_todo($_, $type, $in) foreach @$fidids;
1493 return 1;
1496 $in = 0 unless $in;
1497 my $nexttry = $self->unix_timestamp . " + " . int($in);
1499 # TODO: convert to prepared statement?
1500 $self->retry_on_deadlock(sub {
1501 if (ref($fidids->[0]) eq 'ARRAY') {
1502 my $sql = $self->ignore_replace .
1503 "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1504 join(', ', ('(?,?,?,?,?)') x scalar @$fidids);
1505 $self->dbh->do($sql, undef, map { @$_, $type, $nexttry } @$fidids);
1506 } else {
1507 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1508 nexttry) VALUES " .
1509 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @$fidids));
1512 $self->condthrow;
1515 # For file_to_queue queues that should be kept small, find the size.
1516 # This isn't fast, but for small queues won't be slow, and is usually only ran
1517 # from a single tracker.
1518 sub file_queue_length {
1519 my $self = shift;
1520 my $type = shift;
1522 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1523 "WHERE type = ?", undef, $type);
1526 # reschedule all deferred replication, return number rescheduled
1527 sub replicate_now {
1528 my ($self) = @_;
1530 $self->retry_on_deadlock(sub {
1531 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1532 " WHERE nexttry > " . $self->unix_timestamp);
1536 # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1537 sub get_fidids_by_device {
1538 my ($self, $devid, $limit) = @_;
1540 my $dbh = $self->dbh;
1541 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1542 undef, $devid);
1543 return $fidids;
1546 # finds a chunk of fids given a set of constraints:
1547 # devid, fidid, age (new or old), limit
1548 # Note that if this function is very slow on your large DB, you're likely
1549 # sorting by "newfiles" and are missing a new index.
1550 # returns an arrayref of fidids
1551 sub get_fidid_chunks_by_device {
1552 my ($self, %o) = @_;
1554 my $dbh = $self->dbh;
1555 my $devid = delete $o{devid};
1556 croak("must supply at least a devid") unless $devid;
1557 my $age = delete $o{age};
1558 my $fidid = delete $o{fidid};
1559 my $limit = delete $o{limit};
1560 croak("invalid options: " . join(', ', keys %o)) if %o;
1561 # If supplied a "previous" fidid, we're paging through.
1562 my $fidsort = '';
1563 my $order = '';
1564 $age ||= 'old';
1565 if ($age eq 'old') {
1566 $fidsort = 'AND fid > ?' if $fidid;
1567 $order = 'ASC';
1568 } elsif ($age eq 'new') {
1569 $fidsort = 'AND fid < ?' if $fidid;
1570 $order = 'DESC';
1571 } else {
1572 croak("invalid age argument: " . $age);
1574 $limit ||= 100;
1575 my @extra = ();
1576 push @extra, $fidid if $fidid;
1578 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1579 $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1580 return $fidids;
1583 # gets fidids above fidid_low up to (and including) fidid_high
1584 sub get_fidids_between {
1585 my ($self, $fidid_low, $fidid_high, $limit) = @_;
1586 $limit ||= 1000;
1587 $limit = int($limit);
1589 my $dbh = $self->dbh;
1590 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file
1591 WHERE fid > ? and fid <= ?
1592 ORDER BY fid LIMIT $limit}, undef, $fidid_low, $fidid_high);
1593 return $fidids;
1596 # creates a new domain, given a domain namespace string. return the dmid on success,
1597 # throw 'dup' on duplicate name.
1598 # override if you want a less racy version.
1599 sub create_domain {
1600 my ($self, $name) = @_;
1601 my $dbh = $self->dbh;
1603 # get the max domain id
1604 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1605 my $rv = eval {
1606 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1607 undef, $maxid + 1, $name);
1609 if ($self->was_duplicate_error) {
1610 throw("dup");
1612 return $maxid+1 if $rv;
1613 die "failed to make domain"; # FIXME: the above is racy.
1616 sub update_host {
1617 my ($self, $hid, $to_update) = @_;
1618 my @keys = sort keys %$to_update;
1619 return unless @keys;
1620 $self->conddup(sub {
1621 $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1622 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
1623 $hid);
1625 return 1;
1628 sub update_host_property {
1629 my ($self, $hostid, $col, $val) = @_;
1630 $self->conddup(sub {
1631 $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
1633 return 1;
1636 # return ne hostid, or throw 'dup' on error.
1637 # NOTE: you need to put them into the initial 'down' state.
1638 sub create_host {
1639 my ($self, $hostname, $ip) = @_;
1640 my $dbh = $self->dbh;
1641 # racy! lazy. no, better: portable! how often does this happen? :)
1642 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1643 my $rv = $self->conddup(sub {
1644 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1645 "VALUES (?, ?, ?, 'down')",
1646 undef, $hid, $hostname, $ip);
1648 return $hid if $rv;
1649 die "db failure";
1652 # return array of row hashrefs containing columns: (fid, fromdevid,
1653 # failcount, flags, nexttry)
1654 sub files_to_replicate {
1655 my ($self, $limit) = @_;
1656 my $ut = $self->unix_timestamp;
1657 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1658 SELECT fid, fromdevid, failcount, flags, nexttry
1659 FROM file_to_replicate
1660 WHERE nexttry <= $ut
1661 ORDER BY nexttry
1662 LIMIT $limit
1663 }, "fid") or return ();
1664 return values %$to_repl_map;
1667 # "new" style queue consumption code.
1668 # from within a transaction, fetch a limit of fids,
1669 # then update each fid's nexttry to be off in the future,
1670 # giving local workers some time to dequeue the items.
1671 # Note:
1672 # DBI (even with RaiseError) returns weird errors on
1673 # deadlocks from selectall_hashref. So we can't do that.
1674 # we also used to retry on deadlock within the routine,
1675 # but instead lets return undef and let job_master retry.
1676 sub grab_queue_chunk {
1677 my $self = shift;
1678 my $queue = shift;
1679 my $limit = shift;
1680 my $extfields = shift;
1682 my $dbh = $self->dbh;
1683 my $tries = 3;
1684 my $work;
1686 return 0 unless $self->lock_queue($queue);
1688 my $extwhere = shift || '';
1689 my $fields = 'fid, nexttry, failcount';
1690 $fields .= ', ' . $extfields if $extfields;
1691 eval {
1692 $dbh->begin_work;
1693 my $ut = $self->unix_timestamp;
1694 my $query = qq{
1695 SELECT $fields
1696 FROM $queue
1697 WHERE nexttry <= $ut
1698 $extwhere
1699 ORDER BY nexttry
1700 LIMIT $limit
1702 $query .= "FOR UPDATE\n" if $self->can_for_update;
1703 my $sth = $dbh->prepare($query);
1704 $sth->execute;
1705 $work = $sth->fetchall_hashref('fid');
1706 # Nothing to work on.
1707 # Now claim the fids for a while.
1708 # TODO: Should be configurable... but not necessary.
1709 my $fidlist = join(',', keys %$work);
1710 unless ($fidlist) { $dbh->commit; return; }
1711 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1712 $dbh->commit;
1714 if ($self->was_deadlock_error) {
1715 eval { $dbh->rollback };
1716 $work = undef;
1717 } else {
1718 $self->condthrow;
1720 # FIXME: Super extra paranoia to prevent deadlocking.
1721 # Need to handle or die on all errors above, but $@ can get reset. For now
1722 # we'll just always ensure there's no transaction running at the end here.
1723 # A (near) release should figure the error detection correctly.
1724 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
1725 $self->unlock_queue($queue);
1727 return defined $work ? values %$work : ();
1730 sub grab_files_to_replicate {
1731 my ($self, $limit) = @_;
1732 return $self->grab_queue_chunk('file_to_replicate', $limit,
1733 'fromdevid, flags');
1736 sub grab_files_to_delete2 {
1737 my ($self, $limit) = @_;
1738 return $self->grab_queue_chunk('file_to_delete2', $limit);
1741 # $extwhere is ugly... but should be fine.
1742 sub grab_files_to_queued {
1743 my ($self, $type, $what, $limit) = @_;
1744 $what ||= 'type, flags';
1745 return $self->grab_queue_chunk('file_to_queue', $limit,
1746 $what, 'AND type = ' . $type);
1749 # although it's safe to have multiple tracker hosts and/or processes
1750 # replicating the same file, around, it's inefficient CPU/time-wise,
1751 # and it's also possible they pick different places and waste disk.
1752 # so the replicator asks the store interface when it's about to start
1753 # and when it's done replicating a fidid, so you can do something smart
1754 # and tell it not to.
1755 sub should_begin_replicating_fidid {
1756 my ($self, $fidid) = @_;
1757 my $lockname = "mgfs:fid:$fidid:replicate";
1758 return 1 if $self->get_lock($lockname, 1);
1759 return 0;
1762 # called when replicator is done replicating a fid, so you can cleanup
1763 # whatever you did in 'should_begin_replicating_fidid' above.
1765 # NOTE: there's a theoretical race condition in the rebalance code,
1766 # where (without locking as provided by
1767 # should_begin_replicating_fidid/note_done_replicating), all copies of
1768 # a file can be deleted by independent replicators doing rebalancing
1769 # in different ways. so you'll probably want to implement some
1770 # locking in this pair of functions.
1771 sub note_done_replicating {
1772 my ($self, $fidid) = @_;
1773 my $lockname = "mgfs:fid:$fidid:replicate";
1774 $self->release_lock($lockname);
1777 sub find_fid_from_file_to_replicate {
1778 my ($self, $fidid) = @_;
1779 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1780 undef, $fidid);
1783 sub find_fid_from_file_to_delete2 {
1784 my ($self, $fidid) = @_;
1785 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1786 undef, $fidid);
1789 sub find_fid_from_file_to_queue {
1790 my ($self, $fidid, $type) = @_;
1791 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1792 undef, $fidid, $type);
1795 sub delete_fid_from_file_to_replicate {
1796 my ($self, $fidid) = @_;
1797 $self->retry_on_deadlock(sub {
1798 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1802 sub delete_fid_from_file_to_queue {
1803 my ($self, $fidid, $type) = @_;
1804 $self->retry_on_deadlock(sub {
1805 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1806 undef, $fidid, $type);
1810 sub delete_fid_from_file_to_delete2 {
1811 my ($self, $fidid) = @_;
1812 $self->retry_on_deadlock(sub {
1813 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1817 sub reschedule_file_to_replicate_absolute {
1818 my ($self, $fid, $abstime) = @_;
1819 $self->retry_on_deadlock(sub {
1820 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1821 undef, $abstime, $fid);
1825 sub reschedule_file_to_replicate_relative {
1826 my ($self, $fid, $in_n_secs) = @_;
1827 $self->retry_on_deadlock(sub {
1828 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1829 "failcount = failcount + 1 WHERE fid = ?",
1830 undef, $in_n_secs, $fid);
1834 sub reschedule_file_to_delete2_absolute {
1835 my ($self, $fid, $abstime) = @_;
1836 $self->retry_on_deadlock(sub {
1837 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1838 undef, $abstime, $fid);
1842 sub reschedule_file_to_delete2_relative {
1843 my ($self, $fid, $in_n_secs) = @_;
1844 $self->retry_on_deadlock(sub {
1845 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1846 "failcount = failcount + 1 WHERE fid = ?",
1847 undef, $in_n_secs, $fid);
1851 # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1852 # table.
1853 sub get_keys_like {
1854 my ($self, $dmid, $prefix, $after, $limit) = @_;
1855 # fix the input... prefix always ends with a % so that it works
1856 # in a LIKE call, and after is either blank or something
1857 $prefix = '' unless defined $prefix;
1858 $prefix .= '%';
1859 $after = '' unless defined $after;
1861 my $like = $self->get_keys_like_operator;
1863 # now select out our keys
1864 return $self->dbh->selectcol_arrayref
1865 ("SELECT dkey FROM file WHERE dmid = ? AND dkey $like ? AND dkey > ? " .
1866 "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
1869 sub get_keys_like_operator { return "LIKE"; }
1871 # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1872 # that were created $secs_ago seconds ago or older.
1873 sub old_tempfiles {
1874 my ($self, $secs_old) = @_;
1875 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1876 "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1879 # given an array of MogileFS::DevFID objects, mass-insert them all
1880 # into file_on (ignoring if they're already present)
1881 sub mass_insert_file_on {
1882 my ($self, @devfids) = @_;
1883 return 1 unless @devfids;
1885 if (@devfids > 1 && ! $self->can_insert_multi) {
1886 $self->mass_insert_file_on($_) foreach @devfids;
1887 return 1;
1890 my (@qmarks, @binds);
1891 foreach my $df (@devfids) {
1892 my ($fidid, $devid) = ($df->fidid, $df->devid);
1893 Carp::croak("got a false fidid") unless $fidid;
1894 Carp::croak("got a false devid") unless $devid;
1895 push @binds, $fidid, $devid;
1896 push @qmarks, "(?,?)";
1899 # TODO: This should possibly be insert_ignore instead
1900 # As if we are adding an extra file_on entry, we do not want to replace the
1901 # exist one. Check REPLACE semantics.
1902 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1903 return 1;
1906 sub set_schema_vesion {
1907 my ($self, $ver) = @_;
1908 $self->set_server_setting("schema_version", int($ver));
1911 # returns array of fidids to try and delete again
1912 sub fids_to_delete_again {
1913 my $self = shift;
1914 my $ut = $self->unix_timestamp;
1915 return @{ $self->dbh->selectcol_arrayref(qq{
1916 SELECT fid
1917 FROM file_to_delete_later
1918 WHERE delafter < $ut
1919 LIMIT 500
1920 }) || [] };
1923 # return 1 on success. die otherwise.
1924 sub enqueue_fids_to_delete {
1925 my ($self, @fidids) = @_;
1926 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1927 # when the first row causes the duplicate error, and the remaining rows are
1928 # not processed.
1929 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1930 $self->enqueue_fids_to_delete($_) foreach @fidids;
1931 return 1;
1933 # TODO: convert to prepared statement?
1934 $self->retry_on_deadlock(sub {
1935 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1936 join(",", map { "(" . int($_) . ")" } @fidids));
1938 $self->condthrow;
1941 sub enqueue_fids_to_delete2 {
1942 my ($self, @fidids) = @_;
1943 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1944 # when the first row causes the duplicate error, and the remaining rows are
1945 # not processed.
1946 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1947 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1948 return 1;
1951 my $nexttry = $self->unix_timestamp;
1953 # TODO: convert to prepared statement?
1954 $self->retry_on_deadlock(sub {
1955 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
1956 nexttry) VALUES " .
1957 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
1959 $self->condthrow;
1962 # clears everything from the fsck_log table
1963 # return 1 on success. die otherwise.
1964 sub clear_fsck_log {
1965 my $self = shift;
1966 $self->dbh->do("DELETE FROM fsck_log");
1967 return 1;
1970 # FIXME: Fsck log entries are processed a little out of order.
1971 # Once a fsck has completed, the log should be re-summarized.
1972 sub fsck_log_summarize {
1973 my $self = shift;
1975 my $lockname = 'mgfs:fscksum';
1976 my $lock = eval { $self->get_lock($lockname, 10) };
1977 return 0 if defined $lock && $lock == 0;
1979 my $logid = $self->max_fsck_logid;
1981 # sum-up evcode counts every so often, to make fsck_status faster,
1982 # avoiding a potentially-huge GROUP BY in the future..
1983 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
1984 # both inclusive:
1985 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
1986 $min_logid++;
1987 my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
1988 while (my ($evcode, $ct) = each %$cts) {
1989 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
1991 $self->set_server_setting("fsck_logid_processed", $logid);
1993 $self->release_lock($lockname) if $lock;
1996 sub fsck_log {
1997 my ($self, %opts) = @_;
1998 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
1999 "VALUES (" . $self->unix_timestamp . ",?,?,?)",
2000 undef,
2001 delete $opts{fid},
2002 delete $opts{code},
2003 delete $opts{devid});
2004 croak("Unknown opts") if %opts;
2005 $self->condthrow;
2007 return 1;
2010 sub get_db_unixtime {
2011 my $self = shift;
2012 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
2015 sub max_fidid {
2016 my $self = shift;
2017 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
2020 sub max_fsck_logid {
2021 my $self = shift;
2022 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
2025 # returns array of $row hashrefs, from fsck_log table
2026 sub fsck_log_rows {
2027 my ($self, $after_logid, $limit) = @_;
2028 $limit = int($limit || 100);
2029 $after_logid = int($after_logid || 0);
2031 my @rows;
2032 my $sth = $self->dbh->prepare(qq{
2033 SELECT logid, utime, fid, evcode, devid
2034 FROM fsck_log
2035 WHERE logid > ?
2036 ORDER BY logid
2037 LIMIT $limit
2039 $sth->execute($after_logid);
2040 my $row;
2041 push @rows, $row while $row = $sth->fetchrow_hashref;
2042 return @rows;
2045 sub fsck_evcode_counts {
2046 my ($self, %opts) = @_;
2047 my $timegte = delete $opts{time_gte};
2048 my $logr = delete $opts{logid_range};
2049 die if %opts;
2051 my $ret = {};
2052 my $sth;
2053 if ($timegte) {
2054 $sth = $self->dbh->prepare(qq{
2055 SELECT evcode, COUNT(*) FROM fsck_log
2056 WHERE utime >= ?
2057 GROUP BY evcode
2059 $sth->execute($timegte||0);
2061 if ($logr) {
2062 $sth = $self->dbh->prepare(qq{
2063 SELECT evcode, COUNT(*) FROM fsck_log
2064 WHERE logid >= ? AND logid <= ?
2065 GROUP BY evcode
2067 $sth->execute($logr->[0], $logr->[1]);
2069 while (my ($ev, $ct) = $sth->fetchrow_array) {
2070 $ret->{$ev} = $ct;
2072 return $ret;
2075 # run before daemonizing. you can die from here if you see something's amiss. or emit
2076 # warnings.
2077 sub pre_daemonize_checks {
2078 my $self = shift;
2080 $self->pre_daemonize_check_slaves;
2083 sub pre_daemonize_check_slaves {
2084 my $sk = MogileFS::Config->server_setting('slave_keys')
2085 or return;
2087 my @slaves;
2088 foreach my $key (split /\s*,\s*/, $sk) {
2089 my $slave = MogileFS::Config->server_setting("slave_$key");
2091 if (!$slave) {
2092 error("key for slave DB config: slave_$key not found in configuration");
2093 next;
2096 my ($dsn, $user, $pass) = split /\|/, $slave;
2097 if (!defined($dsn) or !defined($user) or !defined($pass)) {
2098 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
2099 next;
2101 push @slaves, [$dsn, $user, $pass]
2104 return unless @slaves; # Escape this block if we don't have a set of slaves anyways
2106 MogileFS::run_global_hook('slave_list_check', \@slaves);
2110 # attempt to grab a lock of lockname, and timeout after timeout seconds.
2111 # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
2112 sub get_lock {
2113 my ($self, $lockname, $timeout) = @_;
2114 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
2115 die "get_lock not implemented for $self";
2118 # attempt to release a lock of lockname.
2119 # returns 1 on success and 0 if no lock we have has that name.
2120 sub release_lock {
2121 my ($self, $lockname) = @_;
2122 die "release_lock not implemented for $self";
2125 # MySQL has an issue where you either get excessive deadlocks, or INSERT's
2126 # hang forever around some transactions. Use ghetto locking to cope.
2127 sub lock_queue { 1 }
2128 sub unlock_queue { 1 }
2130 sub BLOB_BIND_TYPE { undef; }
2132 sub set_checksum {
2133 my ($self, $fidid, $hashtype, $checksum) = @_;
2134 my $dbh = $self->dbh;
2135 die "Your database does not support REPLACE! Reimplement set_checksum!" unless $self->can_replace;
2137 eval {
2138 my $sth = $dbh->prepare("REPLACE INTO checksum " .
2139 "(fid, hashtype, checksum) " .
2140 "VALUES (?, ?, ?)");
2141 $sth->bind_param(1, $fidid);
2142 $sth->bind_param(2, $hashtype);
2143 $sth->bind_param(3, $checksum, BLOB_BIND_TYPE);
2144 $sth->execute;
2146 $self->condthrow;
2149 sub get_checksum {
2150 my ($self, $fidid) = @_;
2152 $self->dbh->selectrow_hashref("SELECT fid, hashtype, checksum " .
2153 "FROM checksum WHERE fid = ?",
2154 undef, $fidid);
2157 sub delete_checksum {
2158 my ($self, $fidid) = @_;
2160 $self->dbh->do("DELETE FROM checksum WHERE fid = ?", undef, $fidid);
2163 # setup the value used in a 'nexttry' field to indicate that this item will
2164 # never actually be tried again and require some sort of manual intervention.
2165 use constant ENDOFTIME => 2147483647;
2167 sub end_of_time { ENDOFTIME; }
2169 # returns the size of the non-urgent replication queue
2170 # nexttry == 0 - the file is urgent
2171 # nexttry != 0 && nexttry < ENDOFTIME - the file is deferred
2172 sub deferred_repl_queue_length {
2173 my ($self) = @_;
2175 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file_to_replicate WHERE nexttry != 0 AND nexttry < ?', undef, $self->end_of_time);
2180 __END__
2182 =head1 NAME
2184 MogileFS::Store - data storage provider. base class.
2186 =head1 ABOUT
2188 MogileFS aims to be database-independent (though currently as of late
2189 2006 only works with MySQL). In the future, the server will create a
2190 singleton instance of type "MogileFS::Store", like
2191 L<MogileFS::Store::MySQL>, and all database interaction will be
2192 through it.
2194 =head1 SEE ALSO
2196 L<MogileFS::Store::MySQL>