1 package MogileFS
::Store
;
5 use MogileFS
::Util
qw(throw max error);
6 use DBI
; # no reason a Store has to be DBI-based, but for now they all are.
9 # this is incremented whenever the schema changes. server will refuse
10 # to start-up with an old schema version
12 # 6: adds file_to_replicate table
13 # 7: adds file_to_delete_later table
14 # 8: adds fsck_log table
15 # 9: adds 'drain' state to enum in device table
16 # 10: adds 'replpolicy' column to 'class' table
17 # 11: adds 'file_to_queue' table
18 # 12: adds 'file_to_delete2' table
19 # 13: modifies 'server_settings.value' to TEXT for wider values
20 # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21 use constant SCHEMA_VERSION
=> 13;
25 return $class->new_from_dsn_user_pass(map { MogileFS
->config($_) } qw(db_dsn db_user db_pass max_handles));
28 sub new_from_dsn_user_pass
{
29 my ($class, $dsn, $user, $pass, $max_handles) = @_;
31 if ($dsn =~ /^DBI:mysql:/i) {
32 $subclass = "MogileFS::Store::MySQL";
33 } elsif ($dsn =~ /^DBI:SQLite:/i) {
34 $subclass = "MogileFS::Store::SQLite";
35 } elsif ($dsn =~ /^DBI:Oracle:/i) {
36 $subclass = "MogileFS::Store::Oracle";
37 } elsif ($dsn =~ /^DBI:Pg:/i) {
38 $subclass = "MogileFS::Store::Postgres";
40 die "Unknown database type: $dsn";
42 unless (eval "use $subclass; 1") {
43 die "Error loading $subclass: $@\n";
49 max_handles
=> $max_handles, # Max number of handles to allow
50 raise_errors
=> $subclass->want_raise_errors,
51 slave_list_cachetime
=> 0,
52 slave_list_cache
=> [],
53 recheck_req_gen
=> 0, # incremented generation, of recheck of dbh being requested
54 recheck_done_gen
=> 0, # once recheck is done, copy of what the request generation was
55 handles_left
=> 0, # amount of times this handle can still be verified
56 server_setting_cache
=> {}, # value-agnostic db setting cache.
62 # Defaults to true now.
63 sub want_raise_errors
{
67 sub new_from_mogdbsetup
{
68 my ($class, %args) = @_;
69 # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
70 my $dsn = $class->dsn_of_dbhost($args{dbname
}, $args{dbhost
}, $args{dbport
});
72 my $try_make_sto = sub {
73 my $dbh = DBI
->connect($dsn, $args{dbuser
}, $args{dbpass
}, {
76 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser
}, $args{dbpass
});
81 # upgrading, apparently, as this database already exists.
82 my $sto = $try_make_sto->();
85 # otherwise, we need to make the requested database, setup permissions, etc
86 $class->status("couldn't connect to database as mogilefs user. trying root...");
87 my $rootdsn = $class->dsn_of_root($args{dbname
}, $args{dbhost
}, $args{dbport
});
88 my $rdbh = DBI
->connect($rootdsn, $args{dbrootuser
}, $args{dbrootpass
}, {
91 die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI
->errstr . "\n";
92 $class->status("connected to database as root user.");
94 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
95 $class->create_db_if_not_exists($rdbh, $args{dbname
});
96 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
97 $class->grant_privileges($rdbh, $args{dbname
}, $args{dbuser
}, $args{dbpass
});
99 # should be ready now:
100 $sto = $try_make_sto->();
103 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
106 # given a root DBI connection, create the named database. succeed
107 # if it it's made, or already exists. die otherwise.
108 sub create_db_if_not_exists
{
109 my ($pkg, $rdbh, $dbname) = @_;
110 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
111 or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
114 sub grant_privileges
{
115 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
116 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
118 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
119 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
121 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
124 sub can_replace
{ 0 }
125 sub can_insertignore
{ 0 }
126 sub can_insert_multi
{ 0 }
128 sub unix_timestamp
{ die "No function in $_[0] to return DB's unixtime." }
132 return "INSERT IGNORE " if $self->can_insertignore;
133 return "REPLACE " if $self->can_replace;
134 die "Can't INSERT IGNORE or REPLACE?";
137 my $on_status = sub {};
138 my $on_confirm = sub { 1 };
139 sub on_status
{ my ($pkg, $code) = @_; $on_status = $code; };
140 sub on_confirm
{ my ($pkg, $code) = @_; $on_confirm = $code; };
141 sub status
{ my ($pkg, $msg) = @_; $on_status->($msg); };
142 sub confirm
{ my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
144 sub latest_schema_version
{ SCHEMA_VERSION
}
148 $self->{raise_errors
} = 1;
149 $self->dbh->{RaiseError
} = 1;
152 sub dsn
{ $_[0]{dsn
} }
153 sub user
{ $_[0]{user
} }
154 sub pass
{ $_[0]{pass
} }
157 sub post_dbi_connect
{ 1 }
159 sub can_do_slaves
{ 0 }
163 die "Incapable of becoming slave." unless $self->can_do_slaves;
170 return $self->{slave
};
173 # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
178 # only reload every 15 seconds.
179 if ($self->{slave_list_cachetime
} > $now - 15) {
180 return @
{$self->{slave_list_cache
}};
182 $self->{slave_list_cachetime
} = $now;
183 $self->{slave_list_cache
} = [];
185 my $sk = MogileFS
::Config
->server_setting('slave_keys')
189 foreach my $key (split /\s*,\s*/, $sk) {
190 my $slave = MogileFS
::Config
->server_setting("slave_$key");
193 error
("key for slave DB config: slave_$key not found in configuration");
197 my ($dsn, $user, $pass) = split /\|/, $slave;
198 if (!defined($dsn) or !defined($user) or !defined($pass)) {
199 error
("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
202 push @ret, [$dsn, $user, $pass]
205 $self->{slave_list_cache
} = \
@ret;
212 die "Incapable of having slaves." unless $self->can_do_slaves;
214 return $self->{slave
} if $self->check_slave;
216 my @slaves_list = $self->_slaves_list;
218 # If we have no slaves, then return silently.
219 return unless @slaves_list;
221 foreach my $slave_fulldsn (@slaves_list) {
222 my $newslave = $self->{slave
} = $self->new_from_dsn_user_pass(@
$slave_fulldsn);
223 $self->{slave_next_check
} = 0;
224 $newslave->mark_as_slave;
226 if $self->check_slave;
229 warn "Slave list exhausted, failing back to master.";
236 return $self unless $self->can_do_slaves;
238 if ($self->{slave_ok
}) {
239 if (my $slave = $self->get_slave) {
240 $slave->{recheck_req_gen
} = $self->{recheck_req_gen
};
252 return unless ref $coderef eq 'CODE';
254 local $self->{slave_ok
} = 1;
256 return $coderef->(@_);
261 $self->{recheck_req_gen
}++;
268 if ($self->{recheck_done_gen
} != $self->{recheck_req_gen
}) {
269 $self->{dbh
} = undef unless $self->{dbh
}->ping;
270 # Handles a memory leak under Solaris/Postgres.
271 $self->{dbh
} = undef if ($self->{max_handles
} &&
272 $self->{handles_left
}-- < 0);
273 $self->{recheck_done_gen
} = $self->{recheck_req_gen
};
275 return $self->{dbh
} if $self->{dbh
};
278 $self->{dbh
} = DBI
->connect($self->{dsn
}, $self->{user
}, $self->{pass
}, {
281 # FUTURE: will default to on (have to validate all callers first):
282 RaiseError
=> ($self->{raise_errors
} || 0),
284 die "Failed to connect to database: " . DBI
->errstr;
285 $self->post_dbi_connect;
286 $self->{handles_left
} = $self->{max_handles
} if $self->{max_handles
};
292 return $self->dbh->ping;
296 my ($self, $optmsg) = @_;
297 my $dbh = $self->dbh;
298 return unless $dbh->err;
299 my ($pkg, $fn, $line) = caller;
300 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
301 $msg .= ": $optmsg" if $optmsg;
302 # Auto rollback failures around transactions.
303 if ($dbh->{AutoCommit
} == 0) { eval { $dbh->rollback }; }
308 my ($self, $sql, @do_params) = @_;
309 my $rv = eval { $self->dbh->do($sql, @do_params) };
310 return $rv unless $@
|| $self->dbh->err;
311 warn "Error with SQL: $sql\n";
312 Carp
::confess
($@
|| $self->dbh->errstr);
316 croak
("Odd number of parameters!") if scalar(@_) % 2;
317 my ($self, $vlist, %uarg) = @_;
319 $ret{$_} = delete $uarg{$_} foreach @
$vlist;
320 croak
("Bogus options: ".join(',',keys %uarg)) if %uarg;
324 sub was_deadlock_error
{
326 my $dbh = $self->dbh;
330 sub was_duplicate_error
{
332 my $dbh = $self->dbh;
336 # run a subref (presumably a database update) in an eval, because you expect it to
337 # maybe fail on duplicate key error, and throw a dup exception for you, else return
340 my ($self, $code) = @_;
341 my $rv = eval { $code->(); };
342 throw
("dup") if $self->was_duplicate_error;
346 # insert row if doesn't already exist
347 # WARNING: This function is NOT transaction safe if the duplicate errors causes
348 # your transaction to halt!
349 # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
350 # is false! Rows before the duplicate will be inserted, but rows after the
351 # duplicate might not be, depending your database.
353 my ($self, $sql, @params) = @_;
354 my $dbh = $self->dbh;
355 if ($self->can_insertignore) {
356 return $dbh->do("INSERT IGNORE $sql", @params);
358 # TODO: Detect bad multi-row insert here.
359 my $rv = eval { $dbh->do("INSERT $sql", @params); };
360 if ($@
|| $dbh->err) {
361 return 1 if $self->was_duplicate_error;
362 # This chunk is identical to condthrow, but we include it directly
363 # here as we know there is definitely an error, and we would like
364 # the caller of this function.
365 my ($pkg, $fn, $line) = caller;
366 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
373 sub retry_on_deadlock
{
376 my $tries = shift || 3;
377 croak
("deadlock retries must be positive") if $tries < 1;
380 while ($tries-- > 0) {
381 $rv = eval { $code->(); };
382 next if ($self->was_deadlock_error);
384 croak
($@
) unless $self->dbh->err;
391 # --------------------------------------------------------------------------
395 sub add_extra_tables
{
397 push @extra_tables, @_;
400 use constant TABLES
=> qw( domain class file tempfile file_to_delete
401 unreachable_fids file_on file_on_corrupt host
402 device server_settings file_to_replicate
403 file_to_delete_later fsck_log file_to_queue
409 my $curver = $sto->schema_version;
411 my $latestver = SCHEMA_VERSION
;
412 if ($curver == $latestver) {
413 $sto->status("Schema already up-to-date at version $curver.");
417 if ($curver > $latestver) {
418 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
422 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
425 foreach my $t (TABLES
, @extra_tables) {
426 $sto->create_table($t);
429 $sto->upgrade_add_host_getport;
430 $sto->upgrade_add_host_altip;
431 $sto->upgrade_add_device_asof;
432 $sto->upgrade_add_device_weight;
433 $sto->upgrade_add_device_readonly;
434 $sto->upgrade_add_device_drain;
435 $sto->upgrade_add_class_replpolicy;
436 $sto->upgrade_modify_server_settings_value;
437 $sto->upgrade_add_file_to_queue_arg;
442 sub cached_schema_version
{
444 return $self->{_cached_schema_version
} ||=
445 $self->schema_version;
450 my $dbh = $self->dbh;
452 $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
456 sub filter_create_sql
{ my ($self, $sql) = @_; return $sql; }
459 my ($self, $table) = @_;
460 my $dbh = $self->dbh;
461 return 1 if $self->table_exists($table);
462 my $meth = "TABLE_$table";
463 my $sql = $self->$meth;
464 $sql = $self->filter_create_sql($sql);
465 $self->status("Running SQL: $sql;");
467 die "Failed to create table $table: " . $dbh->errstr;
468 my $imeth = "INDEXES_$table";
469 my @indexes = eval { $self->$imeth };
470 foreach $sql (@indexes) {
471 $self->status("Running SQL: $sql;");
473 die "Failed to create indexes on $table: " . $dbh->errstr;
477 # Please try to keep all tables aligned nicely
478 # with '"CREATE TABLE' on the first line
479 # and ')"' alone on the last line.
482 # classes are tied to domains. domains can have classes of items
483 # with different mindevcounts.
485 # a minimum devcount is the number of copies the system tries to
486 # maintain for files in that class
488 # unspecified classname means classid=0 (implicit class), and that
489 # implies mindevcount=2
490 "CREATE TABLE domain (
491 dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
492 namespace VARCHAR(255),
498 "CREATE TABLE class (
499 dmid SMALLINT UNSIGNED NOT NULL,
500 classid TINYINT UNSIGNED NOT NULL,
501 PRIMARY KEY (dmid,classid),
502 classname VARCHAR(50),
503 UNIQUE (dmid,classname),
504 mindevcount TINYINT UNSIGNED NOT NULL
508 # the length field is only here for easy verifications of content
509 # integrity when copying around. no sums or content types or other
510 # metadata here. application can handle that.
512 # classid is what class of file this belongs to. for instance, on fotobilder
513 # there will be a class for original pictures (the ones the user uploaded)
514 # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
515 # each domain can setup classes and assign the minimum redundancy level for
516 # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
517 # photos and and a 1 minimum for derived images (which means the sole device
518 # for a derived image can die, bringing devcount to 0 for that file, but
519 # the application can recreate it from its original)
522 fid INT UNSIGNED NOT NULL,
525 dmid SMALLINT UNSIGNED NOT NULL,
526 dkey VARCHAR(255), # domain-defined
527 UNIQUE dkey (dmid, dkey),
529 length BIGINT UNSIGNED, # big limit
531 classid TINYINT UNSIGNED NOT NULL,
532 devcount TINYINT UNSIGNED NOT NULL,
533 INDEX devcount (dmid,classid,devcount)
538 "CREATE TABLE tempfile (
539 fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
542 createtime INT UNSIGNED NOT NULL,
543 classid TINYINT UNSIGNED NOT NULL,
544 dmid SMALLINT UNSIGNED NOT NULL,
550 # files marked for death when their key is overwritten. then they get a new
551 # fid, but since the old row (with the old fid) had to be deleted immediately,
552 # we need a place to store the fid so an async job can delete the file from
554 sub TABLE_file_to_delete
{
555 "CREATE TABLE file_to_delete (
556 fid INT UNSIGNED NOT NULL,
561 # if the replicator notices that a fid has no sources, that file gets inserted
562 # into the unreachable_fids table. it is up to the application to actually
563 # handle fids stored in this table.
564 sub TABLE_unreachable_fids
{
565 "CREATE TABLE unreachable_fids (
566 fid INT UNSIGNED NOT NULL,
567 lastupdate INT UNSIGNED NOT NULL,
573 # what files are on what devices? (most likely physical devices,
574 # as logical devices of RAID arrays would be costly, and mogilefs
575 # already handles redundancy)
577 # the devid index lets us answer "What files were on this now-dead disk?"
579 "CREATE TABLE file_on (
580 fid INT UNSIGNED NOT NULL,
581 devid MEDIUMINT UNSIGNED NOT NULL,
582 PRIMARY KEY (fid, devid),
587 # if application or framework detects an error in one of the duplicate files
588 # for whatever reason, it can register its complaint and the framework
589 # will do some verifications and fix things up w/ an async job
590 # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
591 # on the other devices?
592 sub TABLE_file_on_corrupt
{
593 "CREATE TABLE file_on_corrupt (
594 fid INT UNSIGNED NOT NULL,
595 devid MEDIUMINT UNSIGNED NOT NULL,
596 PRIMARY KEY (fid, devid)
600 # hosts (which contain devices...)
603 hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
605 status ENUM('alive','dead','down'),
606 http_port MEDIUMINT UNSIGNED DEFAULT 7500,
607 http_get_port MEDIUMINT UNSIGNED,
609 hostname VARCHAR(40),
621 "CREATE TABLE device (
622 devid MEDIUMINT UNSIGNED NOT NULL,
623 hostid MEDIUMINT UNSIGNED NOT NULL,
625 status ENUM('alive','dead','down'),
626 weight MEDIUMINT DEFAULT 100,
628 mb_total MEDIUMINT UNSIGNED,
629 mb_used MEDIUMINT UNSIGNED,
630 mb_asof INT UNSIGNED,
636 sub TABLE_server_settings
{
637 "CREATE TABLE server_settings (
638 field VARCHAR(50) PRIMARY KEY,
643 sub TABLE_file_to_replicate
{
644 # nexttry is time to try to replicate it next.
645 # 0 means immediate. it's only on one host.
646 # 1 means lower priority. it's on 2+ but isn't happy where it's at.
647 # unix timestamp means at/after that time. some previous error occurred.
648 # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
649 # failcount. how many times we've failed, just for doing backoff of nexttry.
650 # flags. reserved for future use.
651 "CREATE TABLE file_to_replicate (
652 fid INT UNSIGNED NOT NULL PRIMARY KEY,
653 nexttry INT UNSIGNED NOT NULL,
655 fromdevid INT UNSIGNED,
656 failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
657 flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
661 sub TABLE_file_to_delete_later
{
662 "CREATE TABLE file_to_delete_later (
663 fid INT UNSIGNED NOT NULL PRIMARY KEY,
664 delafter INT UNSIGNED NOT NULL,
670 "CREATE TABLE fsck_log (
671 logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
673 utime INT UNSIGNED NOT NULL,
674 fid INT UNSIGNED NULL,
676 devid MEDIUMINT UNSIGNED,
681 # generic queue table, designed to be used for workers/jobs which aren't
682 # constantly in use, and are async to the user.
683 # ie; fsck, drain, rebalance.
684 sub TABLE_file_to_queue
{
685 "CREATE TABLE file_to_queue (
686 fid INT UNSIGNED NOT NULL,
688 type TINYINT UNSIGNED NOT NULL,
689 nexttry INT UNSIGNED NOT NULL,
690 failcount TINYINT UNSIGNED NOT NULL default '0',
691 flags SMALLINT UNSIGNED NOT NULL default '0',
693 PRIMARY KEY (fid, type),
694 INDEX type_nexttry (type,nexttry)
698 # new style async delete table.
699 # this is separate from file_to_queue since deletes are more actively used,
700 # and partitioning on 'type' doesn't always work so well.
701 sub TABLE_file_to_delete2
{
702 "CREATE TABLE file_to_delete2 (
703 fid INT UNSIGNED NOT NULL PRIMARY KEY,
704 nexttry INT UNSIGNED NOT NULL,
705 failcount TINYINT UNSIGNED NOT NULL default '0',
706 INDEX nexttry (nexttry)
710 # these five only necessary for MySQL, since no other database existed
711 # before, so they can just create the tables correctly to begin with.
712 # in the future, there might be new alters that non-MySQL databases
713 # will have to implement.
714 sub upgrade_add_host_getport
{ 1 }
715 sub upgrade_add_host_altip
{ 1 }
716 sub upgrade_add_device_asof
{ 1 }
717 sub upgrade_add_device_weight
{ 1 }
718 sub upgrade_add_device_readonly
{ 1 }
719 sub upgrade_add_device_drain
{ die "Not implemented in $_[0]" }
720 sub upgrade_modify_server_settings_value
{ die "Not implemented in $_[0]" }
721 sub upgrade_add_file_to_queue_arg
{ die "Not implemented in $_[0]" }
723 sub upgrade_add_class_replpolicy
{
725 unless ($self->column_type("class", "replpolicy")) {
726 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
730 # return true if deleted, 0 if didn't exist, exception if error
732 my ($self, $hostid) = @_;
733 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
736 # return true if deleted, 0 if didn't exist, exception if error
738 my ($self, $dmid) = @_;
739 return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
742 sub domain_has_files
{
743 my ($self, $dmid) = @_;
744 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
746 return $has_a_fid ?
1 : 0;
749 sub class_has_files
{
750 my ($self, $dmid, $clid) = @_;
751 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
752 undef, $dmid, $clid);
753 return $has_a_fid ?
1 : 0;
756 # return new classid on success (non-zero integer), die on failure
757 # throw 'dup' on duplicate name
758 # override this if you want a less racy version.
760 my ($self, $dmid, $classname) = @_;
761 my $dbh = $self->dbh;
763 # get the max class id in this domain
764 my $maxid = $dbh->selectrow_array
765 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
767 # now insert the new class
769 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
770 undef, $dmid, $maxid + 1, $classname, 2);
772 if ($@
|| $dbh->err) {
773 if ($self->was_duplicate_error) {
777 return $maxid + 1 if $rv;
782 # return 1 on success, throw "dup" on duplicate name error, die otherwise
783 sub update_class_name
{
785 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
787 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
788 undef, $arg{classname
}, $arg{dmid
}, $arg{classid
});
790 throw
("dup") if $self->was_duplicate_error;
795 # return 1 on success, die otherwise
796 sub update_class_mindevcount
{
798 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
800 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
801 undef, $arg{mindevcount
}, $arg{dmid
}, $arg{classid
});
807 # return 1 on success, die otherwise
808 sub update_class_replpolicy
{
810 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
812 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
813 undef, $arg{replpolicy
}, $arg{dmid
}, $arg{classid
});
819 sub nfiles_with_dmid_classid_devcount
{
820 my ($self, $dmid, $classid, $devcount) = @_;
821 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
822 undef, $dmid, $classid, $devcount);
825 sub set_server_setting
{
826 my ($self, $key, $val) = @_;
827 my $dbh = $self->dbh;
828 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
832 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
834 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
838 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
842 # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
843 sub incr_server_setting
{
844 my ($self, $key, $val) = @_;
845 $val = 1 unless defined $val;
848 return 1 if $self->dbh->do("UPDATE server_settings ".
849 "SET value=value+? ".
850 "WHERE field=?", undef,
852 $self->set_server_setting($key, $val);
856 my ($self, $key) = @_;
857 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
861 # generic server setting cache.
862 # note that you can call the same server setting with different timeouts, but
863 # the timeout specified at the time of ... timeout, wins.
864 sub server_setting_cached
{
865 my ($self, $key, $timeout) = @_;
866 $self->{server_setting_cache
}->{$key} ||= {val
=> '', refresh
=> 0};
867 my $cache = $self->{server_setting_cache
}->{$key};
869 if ($now > $cache->{refresh
}) {
870 $cache->{val
} = $self->server_setting($key);
871 $cache->{refresh
} = $now + $timeout;
873 return $cache->{val
};
876 sub server_settings
{
879 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
881 while (my ($k, $v) = $sth->fetchrow_array) {
887 # register a tempfile and return the fidid, which should be allocated
888 # using autoincrement/sequences if the passed in fid is undef. however,
889 # if fid is passed in, that value should be used and returned.
891 # return new/passed in fidid on success.
892 # throw 'dup' if fid already in use
893 # return 0/undef/die on failure
895 sub register_tempfile
{
897 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
899 my $dbh = $self->dbh;
902 my $explicit_fid_used = $fid ?
1 : 0;
904 # setup the new mapping. we store the devices that we picked for
905 # this file in here, knowing that they might not be used. create_close
906 # is responsible for actually mapping in file_on. NOTE: fid is being
907 # passed in, it's either some number they gave us, or it's going to be
908 # 0/undef which translates into NULL which means to automatically create
909 # one. that should be fine.
910 my $ins_tempfile = sub {
912 # We must only pass the correct number of bind parameters
913 # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
914 # Postgres, where you are expected to leave it out or use DEFAULT
915 # Leaving it out seems sanest and least likely to cause problems
916 # with other databases.
917 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
918 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
919 my @vals = ($arg{dmid
}, $arg{key
}, $arg{classid
} || 0, $arg{devids
});
920 # Do not check for $explicit_fid_used, but rather $fid directly
921 # as this anonymous sub is called from the loop later
923 unshift @keys, 'fid';
927 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
928 $dbh->do($sql, undef, @vals);
931 return undef if $self->was_duplicate_error;
932 die "Unexpected db error into tempfile: " . $dbh->errstr;
935 unless (defined $fid) {
936 # if they did not give us a fid, then we want to grab the one that was
937 # theoretically automatically generated
938 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
939 or die "No last_insert_id found";
941 return undef unless defined $fid && $fid > 0;
945 unless ($ins_tempfile->()) {
946 throw
("dup") if $explicit_fid_used;
947 die "tempfile insert failed";
950 my $fid_in_use = sub {
951 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
952 return $exists ?
1 : 0;
955 # if the fid is in use, do something
956 while ($fid_in_use->($fid)) {
957 throw
("dup") if $explicit_fid_used;
959 # be careful of databases which reset their
960 # auto-increment/sequences when the table is empty (InnoDB
961 # did/does this, for instance). So check if it's in use, and
962 # re-seed the table with the highest known fid from the file
965 # get the highest fid from the filetable and insert a dummy row
966 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
967 $ins_tempfile->(); # don't care about its result
969 # then do a normal auto-increment
971 $ins_tempfile->() or die "register_tempfile failed after seeding";
977 # return hashref of row containing columns "fid, dmid, dkey, length,
978 # classid, devcount" provided a $dmid and $key (dkey). or undef if no
980 sub file_row_from_dmid_key
{
981 my ($self, $dmid, $key) = @_;
982 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
983 "FROM file WHERE dmid=? AND dkey=?",
987 # return hashref of row containing columns "fid, dmid, dkey, length,
988 # classid, devcount" provided a $fidid or undef if no row.
989 sub file_row_from_fidid
{
990 my ($self, $fidid) = @_;
991 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
992 "FROM file WHERE fid=?",
996 # return an arrayref of rows containing columns "fid, dmid, dkey, length,
997 # classid, devcount" provided a pair of $fidid or undef if no rows.
998 sub file_row_from_fidid_range
{
999 my ($self, $fromfid, $tofid) = @_;
1000 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1001 "FROM file WHERE fid BETWEEN ? AND ?");
1002 $sth->execute($fromfid,$tofid);
1003 return $sth->fetchall_arrayref({});
1006 # return array of devids that a fidid is on
1008 my ($self, $fidid) = @_;
1009 return @
{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
1010 undef, $fidid) || [] };
1013 # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1014 sub fid_devids_multiple
{
1015 my ($self, @fidids) = @_;
1016 my $in = join(",", map { $_+0 } @fidids);
1018 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1020 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1021 push @
{$ret->{$fidid} ||= []}, $devid;
1026 # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1027 sub tempfile_row_from_fid
{
1028 my ($self, $fidid) = @_;
1029 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey ".
1030 "FROM tempfile WHERE fid=?",
1034 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1036 my ($self, $devid, $hostid, $status) = @_;
1037 my $rv = $self->conddup(sub {
1038 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1039 $devid, $hostid, $status);
1042 die "error making device $devid\n" unless $rv > 0;
1046 sub update_device_usage
{
1048 my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
1050 $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
1051 " WHERE devid = ?", undef, $arg{mb_total
}, $arg{mb_used
}, $arg{devid
});
1056 sub mark_fidid_unreachable
{
1057 my ($self, $fidid) = @_;
1058 die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
1059 $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
1063 sub set_device_weight
{
1064 my ($self, $devid, $weight) = @_;
1066 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1071 sub set_device_state
{
1072 my ($self, $devid, $state) = @_;
1074 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1080 my ($self, $dmid, $cid) = @_;
1082 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1088 my ($self, $fidid) = @_;
1089 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
1091 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1093 $self->enqueue_for_delete2($fidid, 0);
1097 sub delete_tempfile_row
{
1098 my ($self, $fidid) = @_;
1099 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1104 # Load the specified tempfile, then delete it. If we succeed, we were
1105 # here first; otherwise, someone else beat us here (and we return undef)
1106 sub delete_and_return_tempfile_row
{
1107 my ($self, $fidid) = @_;
1108 my $rv = $self->tempfile_row_from_fid($fidid);
1109 my $rows_deleted = $self->delete_tempfile_row($fidid);
1110 return $rv if ($rows_deleted > 0);
1113 sub replace_into_file
{
1115 my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
1116 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1118 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1119 "VALUES (?,?,?,?,?,0) ", undef,
1120 @arg{'fidid', 'dmid', 'key', 'length', 'classid'});
1125 # returns 1 on success, 0 on duplicate key error, dies on exception
1126 # TODO: need a test to hit the duplicate name error condition
1127 # TODO: switch to using "dup" exception here?
1129 my ($self, $fidid, $to_key) = @_;
1130 my $dbh = $self->dbh;
1132 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1133 undef, $to_key, $fidid);
1135 if ($@
|| $dbh->err) {
1136 # first is MySQL's error code for duplicates
1137 if ($self->was_duplicate_error) {
1147 # returns a hash of domains. Key is namespace, value is dmid.
1148 sub get_all_domains
{
1150 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1151 return map { ($_->[0], $_->[1]) } @
{$domains || []};
1154 # returns an array of hashrefs, one hashref per row in the 'class' table
1155 sub get_all_classes
{
1160 if ($self->cached_schema_version >= 10) {
1161 $repl_col = ", replpolicy";
1164 my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount $repl_col FROM class");
1166 push @ret, $row while $row = $sth->fetchrow_hashref;
1170 # add a record of fidid existing on devid
1171 # returns 1 on success, 0 on duplicate
1172 sub add_fidid_to_devid
{
1173 my ($self, $fidid, $devid) = @_;
1174 croak
("fidid not non-zero") unless $fidid;
1175 croak
("devid not non-zero") unless $devid;
1177 # TODO: This should possibly be insert_ignore instead
1178 # As if we are adding an extra file_on entry, we do not want to replace the
1179 # exist one. Check REPLACE semantics.
1180 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1181 undef, $fidid, $devid);
1182 return 1 if $rv > 0;
1186 # remove a record of fidid existing on devid
1187 # returns 1 on success, 0 if not there anyway
1188 sub remove_fidid_from_devid
{
1189 my ($self, $fidid, $devid) = @_;
1190 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1191 undef, $fidid, $devid); };
1196 # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1199 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1200 "hostip, http_port, http_get_port, altip, altmask FROM host");
1203 while (my $row = $sth->fetchrow_hashref) {
1209 # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1210 sub get_all_devices
{
1212 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1213 "mb_used, mb_asof, status, weight FROM device");
1217 while (my $row = $sth->fetchrow_hashref) {
1223 # update the device count for a given fidid
1224 sub update_devcount
{
1225 my ($self, $fidid) = @_;
1226 my $dbh = $self->dbh;
1227 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1230 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1237 # update the classid for a given fidid
1238 sub update_classid
{
1239 my ($self, $fidid, $classid) = @_;
1240 my $dbh = $self->dbh;
1242 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1249 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1250 sub enqueue_for_replication
{
1251 my ($self, $fidid, $from_devid, $in) = @_;
1254 my $nexttry = $self->unix_timestamp . " + " . int($in);
1256 $self->retry_on_deadlock(sub {
1257 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1258 "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1262 # enqueue a fidid for delete
1263 # note: if we get one more "independent" queue like this, the
1264 # code should be collapsable? I tried once and it looked too ugly, so we have
1266 sub enqueue_for_delete2
{
1267 my ($self, $fidid, $in) = @_;
1270 my $nexttry = $self->unix_timestamp . " + " . int($in);
1272 $self->retry_on_deadlock(sub {
1273 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1274 "VALUES (?,$nexttry)", undef, $fidid);
1278 # enqueue a fidid for work
1279 sub enqueue_for_todo
{
1280 my ($self, $fidid, $type, $in) = @_;
1283 my $nexttry = $self->unix_timestamp . " + " . int($in);
1285 $self->retry_on_deadlock(sub {
1287 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1288 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1289 $fidid->[0], $fidid->[1], $fidid->[2], $type);
1291 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1292 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1297 # return 1 on success. die otherwise.
1298 sub enqueue_many_for_todo
{
1299 my ($self, $fidids, $type, $in) = @_;
1300 if (@
$fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1301 $self->enqueue_for_todo($_, $type, $in) foreach @
$fidids;
1306 my $nexttry = $self->unix_timestamp . " + " . int($in);
1308 # TODO: convert to prepared statement?
1309 $self->retry_on_deadlock(sub {
1310 if (ref($fidids->[0]) eq 'ARRAY') {
1311 my $sql = $self->ignore_replace .
1312 "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1313 join(', ', ('(?,?,?,?,?)') x
scalar @
$fidids);
1314 $self->dbh->do($sql, undef, map { @
$_, $type, $nexttry } @
$fidids);
1316 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1318 join(",", map { "(" . int($_->{fid
}) . ", $type, $nexttry)" } @
$fidids));
1324 # For file_to_queue queues that should be kept small, find the size.
1325 # This isn't fast, but for small queues won't be slow, and is usually only ran
1326 # from a single tracker.
1327 sub file_queue_length
{
1331 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1332 "WHERE type = ?", undef, $type);
1335 # reschedule all deferred replication, return number rescheduled
1339 $self->retry_on_deadlock(sub {
1340 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1341 " WHERE nexttry > " . $self->unix_timestamp);
1345 # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1346 sub get_fidids_by_device
{
1347 my ($self, $devid, $limit) = @_;
1349 my $dbh = $self->dbh;
1350 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1355 # finds a chunk of fids given a set of constraints:
1356 # devid, fidid, age (new or old), limit
1357 # Note that if this function is very slow on your large DB, you're likely
1358 # sorting by "newfiles" and are missing a new index.
1359 # returns an arrayref of fidids
1360 sub get_fidid_chunks_by_device
{
1361 my ($self, %o) = @_;
1363 my $dbh = $self->dbh;
1364 my $devid = delete $o{devid
};
1365 croak
("must supply at least a devid") unless $devid;
1366 my $age = delete $o{age
};
1367 my $fidid = delete $o{fidid
};
1368 my $limit = delete $o{limit
};
1369 croak
("invalid options: " . join(', ', keys %o)) if %o;
1370 # If supplied a "previous" fidid, we're paging through.
1374 if ($age eq 'old') {
1375 $fidsort = 'AND fid > ?' if $fidid;
1377 } elsif ($age eq 'new') {
1378 $fidsort = 'AND fid < ?' if $fidid;
1381 croak
("invalid age argument: " . $age);
1385 push @extra, $fidid if $fidid;
1387 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1388 $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1392 # takes two arguments, fidid to be above, and optional limit (default
1393 # 1,000). returns up to that that many fidids above the provided
1394 # fidid. returns array of MogileFS::FID objects, sorted by fid ids.
1395 sub get_fids_above_id
{
1396 my ($self, $fidid, $limit) = @_;
1398 $limit = int($limit);
1401 my $dbh = $self->dbh;
1402 my $sth = $dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1405 "ORDER BY fid LIMIT $limit");
1406 $sth->execute($fidid);
1407 while (my $row = $sth->fetchrow_hashref) {
1408 push @ret, MogileFS
::FID
->new_from_db_row($row);
1413 # Same as above, but returns unblessed hashref.
1414 sub get_fidids_above_id
{
1415 my ($self, $fidid, $limit) = @_;
1417 $limit = int($limit);
1419 my $dbh = $self->dbh;
1420 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file WHERE fid
> ?
1421 ORDER BY fid LIMIT
$limit});
1425 # creates a new domain, given a domain namespace string. return the dmid on success,
1426 # throw 'dup' on duplicate name.
1427 # override if you want a less racy version.
1429 my ($self, $name) = @_;
1430 my $dbh = $self->dbh;
1432 # get the max domain id
1433 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1435 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1436 undef, $maxid + 1, $name);
1438 if ($self->was_duplicate_error) {
1441 return $maxid+1 if $rv;
1442 die "failed to make domain"; # FIXME: the above is racy.
1445 sub update_host_property
{
1446 my ($self, $hostid, $col, $val) = @_;
1447 $self->conddup(sub {
1448 $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
1453 # return ne hostid, or throw 'dup' on error.
1454 # NOTE: you need to put them into the initial 'down' state.
1456 my ($self, $hostname, $ip) = @_;
1457 my $dbh = $self->dbh;
1458 # racy! lazy. no, better: portable! how often does this happen? :)
1459 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1460 my $rv = $self->conddup(sub {
1461 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1462 "VALUES (?, ?, ?, 'down')",
1463 undef, $hid, $hostname, $ip);
1469 # return array of row hashrefs containing columns: (fid, fromdevid,
1470 # failcount, flags, nexttry)
1471 sub files_to_replicate
{
1472 my ($self, $limit) = @_;
1473 my $ut = $self->unix_timestamp;
1474 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1475 SELECT fid
, fromdevid
, failcount
, flags
, nexttry
1476 FROM file_to_replicate
1477 WHERE nexttry
<= $ut
1480 }, "fid") or return ();
1481 return values %$to_repl_map;
1484 # "new" style queue consumption code.
1485 # from within a transaction, fetch a limit of fids,
1486 # then update each fid's nexttry to be off in the future,
1487 # giving local workers some time to dequeue the items.
1489 # DBI (even with RaiseError) returns weird errors on
1490 # deadlocks from selectall_hashref. So we can't do that.
1491 # we also used to retry on deadlock within the routine,
1492 # but instead lets return undef and let job_master retry.
1493 sub grab_queue_chunk
{
1497 my $extfields = shift;
1499 my $dbh = $self->dbh;
1503 my $extwhere = shift || '';
1504 my $fields = 'fid, nexttry, failcount';
1505 $fields .= ', ' . $extfields if $extfields;
1508 my $ut = $self->unix_timestamp;
1509 my $sth = $dbh->prepare(qq{
1512 WHERE nexttry
<= $ut
1519 $work = $sth->fetchall_hashref('fid');
1520 # Nothing to work on.
1521 # Now claim the fids for a while.
1522 # TODO: Should be configurable... but not necessary.
1523 my $fidlist = join(',', keys %$work);
1524 unless ($fidlist) { $dbh->commit; return; }
1525 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1528 if ($self->was_deadlock_error) {
1529 eval { $dbh->rollback };
1534 return defined $work ?
values %$work : ();
1537 sub grab_files_to_replicate
{
1538 my ($self, $limit) = @_;
1539 return $self->grab_queue_chunk('file_to_replicate', $limit,
1540 'fromdevid, flags');
1543 sub grab_files_to_delete2
{
1544 my ($self, $limit) = @_;
1545 return $self->grab_queue_chunk('file_to_delete2', $limit);
1548 # $extwhere is ugly... but should be fine.
1549 sub grab_files_to_queued
{
1550 my ($self, $type, $what, $limit) = @_;
1551 $what ||= 'type, flags';
1552 return $self->grab_queue_chunk('file_to_queue', $limit,
1553 $what, 'AND type = ' . $type);
1556 # although it's safe to have multiple tracker hosts and/or processes
1557 # replicating the same file, around, it's inefficient CPU/time-wise,
1558 # and it's also possible they pick different places and waste disk.
1559 # so the replicator asks the store interface when it's about to start
1560 # and when it's done replicating a fidid, so you can do something smart
1561 # and tell it not to.
1562 sub should_begin_replicating_fidid
{
1563 my ($self, $fidid) = @_;
1564 warn("Inefficient implementation of should_begin_replicating_fidid() in $self!\n");
1568 # called when replicator is done replicating a fid, so you can cleanup
1569 # whatever you did in 'should_begin_replicating_fidid' above.
1571 # NOTE: there's a theoretical race condition in the rebalance code,
1572 # where (without locking as provided by
1573 # should_begin_replicating_fidid/note_done_replicating), all copies of
1574 # a file can be deleted by independent replicators doing rebalancing
1575 # in different ways. so you'll probably want to implement some
1576 # locking in this pair of functions.
1577 sub note_done_replicating
{
1578 my ($self, $fidid) = @_;
1581 sub delete_fid_from_file_to_replicate
{
1582 my ($self, $fidid) = @_;
1583 $self->retry_on_deadlock(sub {
1584 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1588 sub delete_fid_from_file_to_queue
{
1589 my ($self, $fidid, $type) = @_;
1590 $self->retry_on_deadlock(sub {
1591 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1592 undef, $fidid, $type);
1596 sub delete_fid_from_file_to_delete2
{
1597 my ($self, $fidid) = @_;
1598 $self->retry_on_deadlock(sub {
1599 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1603 sub reschedule_file_to_replicate_absolute
{
1604 my ($self, $fid, $abstime) = @_;
1605 $self->retry_on_deadlock(sub {
1606 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1607 undef, $abstime, $fid);
1611 sub reschedule_file_to_replicate_relative
{
1612 my ($self, $fid, $in_n_secs) = @_;
1613 $self->retry_on_deadlock(sub {
1614 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1615 "failcount = failcount + 1 WHERE fid = ?",
1616 undef, $in_n_secs, $fid);
1620 sub reschedule_file_to_delete2_absolute
{
1621 my ($self, $fid, $abstime) = @_;
1622 $self->retry_on_deadlock(sub {
1623 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1624 undef, $abstime, $fid);
1628 sub reschedule_file_to_delete2_relative
{
1629 my ($self, $fid, $in_n_secs) = @_;
1630 $self->retry_on_deadlock(sub {
1631 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1632 "failcount = failcount + 1 WHERE fid = ?",
1633 undef, $in_n_secs, $fid);
1637 # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1640 my ($self, $dmid, $prefix, $after, $limit) = @_;
1641 # fix the input... prefix always ends with a % so that it works
1642 # in a LIKE call, and after is either blank or something
1643 $prefix = '' unless defined $prefix;
1645 $after = '' unless defined $after;
1647 # now select out our keys
1648 return $self->dbh->selectcol_arrayref
1649 ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
1650 "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
1653 # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1654 # that were created $secs_ago seconds ago or older.
1656 my ($self, $secs_old) = @_;
1657 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1658 "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1661 # given an array of MogileFS::DevFID objects, mass-insert them all
1662 # into file_on (ignoring if they're already present)
1663 sub mass_insert_file_on
{
1664 my ($self, @devfids) = @_;
1665 return 1 unless @devfids;
1667 if (@devfids > 1 && ! $self->can_insert_multi) {
1668 $self->mass_insert_file_on($_) foreach @devfids;
1672 my (@qmarks, @binds);
1673 foreach my $df (@devfids) {
1674 my ($fidid, $devid) = ($df->fidid, $df->devid);
1675 Carp
::croak
("got a false fidid") unless $fidid;
1676 Carp
::croak
("got a false devid") unless $devid;
1677 push @binds, $fidid, $devid;
1678 push @qmarks, "(?,?)";
1681 # TODO: This should possibly be insert_ignore instead
1682 # As if we are adding an extra file_on entry, we do not want to replace the
1683 # exist one. Check REPLACE semantics.
1684 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1688 sub set_schema_vesion
{
1689 my ($self, $ver) = @_;
1690 $self->set_server_setting("schema_version", int($ver));
1693 # returns array of fidids to try and delete again
1694 sub fids_to_delete_again
{
1696 my $ut = $self->unix_timestamp;
1697 return @
{ $self->dbh->selectcol_arrayref(qq{
1699 FROM file_to_delete_later
1700 WHERE delafter
< $ut
1705 # return 1 on success. die otherwise.
1706 sub enqueue_fids_to_delete
{
1707 my ($self, @fidids) = @_;
1708 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1709 # when the first row causes the duplicate error, and the remaining rows are
1711 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1712 $self->enqueue_fids_to_delete($_) foreach @fidids;
1715 # TODO: convert to prepared statement?
1716 $self->retry_on_deadlock(sub {
1717 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1718 join(",", map { "(" . int($_) . ")" } @fidids));
1723 sub enqueue_fids_to_delete2
{
1724 my ($self, @fidids) = @_;
1725 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1726 # when the first row causes the duplicate error, and the remaining rows are
1728 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1729 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1733 my $nexttry = $self->unix_timestamp;
1735 # TODO: convert to prepared statement?
1736 $self->retry_on_deadlock(sub {
1737 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
1739 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
1744 # clears everything from the fsck_log table
1745 # return 1 on success. die otherwise.
1746 sub clear_fsck_log
{
1748 $self->dbh->do("DELETE FROM fsck_log");
1752 # FIXME: Fsck log entries are processed a little out of order.
1753 # Once a fsck has completed, the log should be re-summarized.
1754 sub fsck_log_summarize
{
1757 my $lockname = 'mgfs:fscksum';
1758 my $lock = eval { $self->get_lock($lockname, 10) };
1759 return 0 if defined $lock && $lock == 0;
1761 my $logid = $self->max_fsck_logid;
1763 # sum-up evcode counts every so often, to make fsck_status faster,
1764 # avoiding a potentially-huge GROUP BY in the future..
1765 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
1767 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
1769 my $cts = $self->fsck_evcode_counts(logid_range
=> [$min_logid, $logid]); # inclusive notation :)
1770 while (my ($evcode, $ct) = each %$cts) {
1771 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
1773 $self->set_server_setting("fsck_logid_processed", $logid);
1775 $self->release_lock($lockname) if $lock;
1779 my ($self, %opts) = @_;
1780 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
1781 "VALUES (" . $self->unix_timestamp . ",?,?,?)",
1785 delete $opts{devid
});
1786 croak
("Unknown opts") if %opts;
1792 sub get_db_unixtime
{
1794 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
1799 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
1802 sub max_fsck_logid
{
1804 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
1807 # returns array of $row hashrefs, from fsck_log table
1809 my ($self, $after_logid, $limit) = @_;
1810 $limit = int($limit || 100);
1811 $after_logid = int($after_logid || 0);
1814 my $sth = $self->dbh->prepare(qq{
1815 SELECT logid
, utime, fid
, evcode
, devid
1821 $sth->execute($after_logid);
1823 push @rows, $row while $row = $sth->fetchrow_hashref;
1827 sub fsck_evcode_counts
{
1828 my ($self, %opts) = @_;
1829 my $timegte = delete $opts{time_gte
};
1830 my $logr = delete $opts{logid_range
};
1836 $sth = $self->dbh->prepare(qq{
1837 SELECT evcode
, COUNT
(*) FROM fsck_log
1841 $sth->execute($timegte||0);
1844 $sth = $self->dbh->prepare(qq{
1845 SELECT evcode
, COUNT
(*) FROM fsck_log
1846 WHERE logid
>= ? AND logid
<= ?
1849 $sth->execute($logr->[0], $logr->[1]);
1851 while (my ($ev, $ct) = $sth->fetchrow_array) {
1857 # run before daemonizing. you can die from here if you see something's amiss. or emit
1859 sub pre_daemonize_checks
{ }
1862 # attempt to grab a lock of lockname, and timeout after timeout seconds.
1863 # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
1865 my ($self, $lockname, $timeout) = @_;
1866 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth
};
1867 die "get_lock not implemented for $self";
1870 # attempt to release a lock of lockname.
1871 # returns 1 on success and 0 if no lock we have has that name.
1873 my ($self, $lockname) = @_;
1874 die "release_lock not implemented for $self";
1877 # returns up to $limit @fidids which are on provided $devid
1878 sub random_fids_on_device
{
1879 my ($self, $devid, $limit) = @_;
1880 $limit = int($limit) || 100;
1882 my $dbh = $self->dbh;
1884 # FIXME: this blows. not random. and good chances these will
1885 # eventually get to point where they're un-rebalance-able, and we
1886 # never move on past the first 5000
1887 my @some_fids = List
::Util
::shuffle
(@
{
1888 $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid=? LIMIT 5000",
1889 undef, $devid) || []
1892 @some_fids = @some_fids[0..$limit-1] if $limit < @some_fids;
1902 MogileFS::Store - data storage provider. base class.
1906 MogileFS aims to be database-independent (though currently as of late
1907 2006 only works with MySQL). In the future, the server will create a
1908 singleton instance of type "MogileFS::Store", like
1909 L<MogileFS::Store::MySQL>, and all database interaction will be
1914 L<MogileFS::Store::MySQL>