add UploadCPAN back to .shipit
[MogileFS-Server.git] / lib / MogileFS / Store.pm
blobb118c313b2c8a1347a8198ff6f7851542e02ce85
1 package MogileFS::Store;
2 use strict;
3 use warnings;
4 use Carp qw(croak);
5 use MogileFS::Util qw(throw max error);
6 use DBI; # no reason a Store has to be DBI-based, but for now they all are.
7 use List::Util ();
9 # this is incremented whenever the schema changes. server will refuse
10 # to start-up with an old schema version
12 # 6: adds file_to_replicate table
13 # 7: adds file_to_delete_later table
14 # 8: adds fsck_log table
15 # 9: adds 'drain' state to enum in device table
16 # 10: adds 'replpolicy' column to 'class' table
17 # 11: adds 'file_to_queue' table
18 # 12: adds 'file_to_delete2' table
19 # 13: modifies 'server_settings.value' to TEXT for wider values
20 # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21 use constant SCHEMA_VERSION => 13;
23 sub new {
24 my ($class) = @_;
25 return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass max_handles));
28 sub new_from_dsn_user_pass {
29 my ($class, $dsn, $user, $pass, $max_handles) = @_;
30 my $subclass;
31 if ($dsn =~ /^DBI:mysql:/i) {
32 $subclass = "MogileFS::Store::MySQL";
33 } elsif ($dsn =~ /^DBI:SQLite:/i) {
34 $subclass = "MogileFS::Store::SQLite";
35 } elsif ($dsn =~ /^DBI:Oracle:/i) {
36 $subclass = "MogileFS::Store::Oracle";
37 } elsif ($dsn =~ /^DBI:Pg:/i) {
38 $subclass = "MogileFS::Store::Postgres";
39 } else {
40 die "Unknown database type: $dsn";
42 unless (eval "use $subclass; 1") {
43 die "Error loading $subclass: $@\n";
45 my $self = bless {
46 dsn => $dsn,
47 user => $user,
48 pass => $pass,
49 max_handles => $max_handles, # Max number of handles to allow
50 raise_errors => $subclass->want_raise_errors,
51 slave_list_cachetime => 0,
52 slave_list_cache => [],
53 recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested
54 recheck_done_gen => 0, # once recheck is done, copy of what the request generation was
55 handles_left => 0, # amount of times this handle can still be verified
56 server_setting_cache => {}, # value-agnostic db setting cache.
57 }, $subclass;
58 $self->init;
59 return $self;
62 # Defaults to true now.
63 sub want_raise_errors {
67 sub new_from_mogdbsetup {
68 my ($class, %args) = @_;
69 # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
70 my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}, $args{dbport});
72 my $try_make_sto = sub {
73 my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
74 PrintError => 0,
75 }) or return undef;
76 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
77 $sto->raise_errors;
78 return $sto;
81 # upgrading, apparently, as this database already exists.
82 my $sto = $try_make_sto->();
83 return $sto if $sto;
85 # otherwise, we need to make the requested database, setup permissions, etc
86 $class->status("couldn't connect to database as mogilefs user. trying root...");
87 my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}, $args{dbport});
88 my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
89 PrintError => 0,
90 }) or
91 die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI->errstr . "\n";
92 $class->status("connected to database as root user.");
94 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
95 $class->create_db_if_not_exists($rdbh, $args{dbname});
96 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
97 $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
99 # should be ready now:
100 $sto = $try_make_sto->();
101 return $sto if $sto;
103 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
106 # given a root DBI connection, create the named database. succeed
107 # if it it's made, or already exists. die otherwise.
108 sub create_db_if_not_exists {
109 my ($pkg, $rdbh, $dbname) = @_;
110 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
111 or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
114 sub grant_privileges {
115 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
116 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
117 undef, $pass)
118 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
119 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
120 undef, $pass)
121 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
124 sub can_replace { 0 }
125 sub can_insertignore { 0 }
126 sub can_insert_multi { 0 }
128 sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
130 sub ignore_replace {
131 my $self = shift;
132 return "INSERT IGNORE " if $self->can_insertignore;
133 return "REPLACE " if $self->can_replace;
134 die "Can't INSERT IGNORE or REPLACE?";
137 my $on_status = sub {};
138 my $on_confirm = sub { 1 };
139 sub on_status { my ($pkg, $code) = @_; $on_status = $code; };
140 sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
141 sub status { my ($pkg, $msg) = @_; $on_status->($msg); };
142 sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
144 sub latest_schema_version { SCHEMA_VERSION }
146 sub raise_errors {
147 my $self = shift;
148 $self->{raise_errors} = 1;
149 $self->dbh->{RaiseError} = 1;
152 sub dsn { $_[0]{dsn} }
153 sub user { $_[0]{user} }
154 sub pass { $_[0]{pass} }
156 sub init { 1 }
157 sub post_dbi_connect { 1 }
159 sub can_do_slaves { 0 }
161 sub mark_as_slave {
162 my $self = shift;
163 die "Incapable of becoming slave." unless $self->can_do_slaves;
165 $self->{slave} = 1;
168 sub is_slave {
169 my $self = shift;
170 return $self->{slave};
173 # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
174 sub _slaves_list {
175 my $self = shift;
176 my $now = time();
178 # only reload every 15 seconds.
179 if ($self->{slave_list_cachetime} > $now - 15) {
180 return @{$self->{slave_list_cache}};
182 $self->{slave_list_cachetime} = $now;
183 $self->{slave_list_cache} = [];
185 my $sk = MogileFS::Config->server_setting('slave_keys')
186 or return ();
188 my @ret;
189 foreach my $key (split /\s*,\s*/, $sk) {
190 my $slave = MogileFS::Config->server_setting("slave_$key");
192 if (!$slave) {
193 error("key for slave DB config: slave_$key not found in configuration");
194 next;
197 my ($dsn, $user, $pass) = split /\|/, $slave;
198 if (!defined($dsn) or !defined($user) or !defined($pass)) {
199 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
200 next;
202 push @ret, [$dsn, $user, $pass]
205 $self->{slave_list_cache} = \@ret;
206 return @ret;
209 sub get_slave {
210 my $self = shift;
212 die "Incapable of having slaves." unless $self->can_do_slaves;
214 return $self->{slave} if $self->check_slave;
216 my @slaves_list = $self->_slaves_list;
218 # If we have no slaves, then return silently.
219 return unless @slaves_list;
221 foreach my $slave_fulldsn (@slaves_list) {
222 my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
223 $self->{slave_next_check} = 0;
224 $newslave->mark_as_slave;
225 return $newslave
226 if $self->check_slave;
229 warn "Slave list exhausted, failing back to master.";
230 return;
233 sub read_store {
234 my $self = shift;
236 return $self unless $self->can_do_slaves;
238 if ($self->{slave_ok}) {
239 if (my $slave = $self->get_slave) {
240 $slave->{recheck_req_gen} = $self->{recheck_req_gen};
241 return $slave;
245 return $self;
248 sub slaves_ok {
249 my $self = shift;
250 my $coderef = shift;
252 return unless ref $coderef eq 'CODE';
254 local $self->{slave_ok} = 1;
256 return $coderef->(@_);
259 sub recheck_dbh {
260 my $self = shift;
261 $self->{recheck_req_gen}++;
264 sub dbh {
265 my $self = shift;
267 if ($self->{dbh}) {
268 if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
269 $self->{dbh} = undef unless $self->{dbh}->ping;
270 # Handles a memory leak under Solaris/Postgres.
271 $self->{dbh} = undef if ($self->{max_handles} &&
272 $self->{handles_left}-- < 0);
273 $self->{recheck_done_gen} = $self->{recheck_req_gen};
275 return $self->{dbh} if $self->{dbh};
278 $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
279 PrintError => 0,
280 AutoCommit => 1,
281 # FUTURE: will default to on (have to validate all callers first):
282 RaiseError => ($self->{raise_errors} || 0),
283 }) or
284 die "Failed to connect to database: " . DBI->errstr;
285 $self->post_dbi_connect;
286 $self->{handles_left} = $self->{max_handles} if $self->{max_handles};
287 return $self->{dbh};
290 sub ping {
291 my $self = shift;
292 return $self->dbh->ping;
295 sub condthrow {
296 my ($self, $optmsg) = @_;
297 my $dbh = $self->dbh;
298 return unless $dbh->err;
299 my ($pkg, $fn, $line) = caller;
300 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
301 $msg .= ": $optmsg" if $optmsg;
302 # Auto rollback failures around transactions.
303 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
304 croak($msg);
307 sub dowell {
308 my ($self, $sql, @do_params) = @_;
309 my $rv = eval { $self->dbh->do($sql, @do_params) };
310 return $rv unless $@ || $self->dbh->err;
311 warn "Error with SQL: $sql\n";
312 Carp::confess($@ || $self->dbh->errstr);
315 sub _valid_params {
316 croak("Odd number of parameters!") if scalar(@_) % 2;
317 my ($self, $vlist, %uarg) = @_;
318 my %ret;
319 $ret{$_} = delete $uarg{$_} foreach @$vlist;
320 croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
321 return %ret;
324 sub was_deadlock_error {
325 my $self = shift;
326 my $dbh = $self->dbh;
327 die "UNIMPLEMENTED";
330 sub was_duplicate_error {
331 my $self = shift;
332 my $dbh = $self->dbh;
333 die "UNIMPLEMENTED";
336 # run a subref (presumably a database update) in an eval, because you expect it to
337 # maybe fail on duplicate key error, and throw a dup exception for you, else return
338 # its return value
339 sub conddup {
340 my ($self, $code) = @_;
341 my $rv = eval { $code->(); };
342 throw("dup") if $self->was_duplicate_error;
343 return $rv;
346 # insert row if doesn't already exist
347 # WARNING: This function is NOT transaction safe if the duplicate errors causes
348 # your transaction to halt!
349 # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
350 # is false! Rows before the duplicate will be inserted, but rows after the
351 # duplicate might not be, depending your database.
352 sub insert_ignore {
353 my ($self, $sql, @params) = @_;
354 my $dbh = $self->dbh;
355 if ($self->can_insertignore) {
356 return $dbh->do("INSERT IGNORE $sql", @params);
357 } else {
358 # TODO: Detect bad multi-row insert here.
359 my $rv = eval { $dbh->do("INSERT $sql", @params); };
360 if ($@ || $dbh->err) {
361 return 1 if $self->was_duplicate_error;
362 # This chunk is identical to condthrow, but we include it directly
363 # here as we know there is definitely an error, and we would like
364 # the caller of this function.
365 my ($pkg, $fn, $line) = caller;
366 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
367 croak($msg);
369 return $rv;
373 sub retry_on_deadlock {
374 my $self = shift;
375 my $code = shift;
376 my $tries = shift || 3;
377 croak("deadlock retries must be positive") if $tries < 1;
378 my $rv;
380 while ($tries-- > 0) {
381 $rv = eval { $code->(); };
382 next if ($self->was_deadlock_error);
383 if ($@) {
384 croak($@) unless $self->dbh->err;
386 last;
388 return $rv;
391 # --------------------------------------------------------------------------
393 my @extra_tables;
395 sub add_extra_tables {
396 my $class = shift;
397 push @extra_tables, @_;
400 use constant TABLES => qw( domain class file tempfile file_to_delete
401 unreachable_fids file_on file_on_corrupt host
402 device server_settings file_to_replicate
403 file_to_delete_later fsck_log file_to_queue
404 file_to_delete2 );
406 sub setup_database {
407 my $sto = shift;
409 my $curver = $sto->schema_version;
411 my $latestver = SCHEMA_VERSION;
412 if ($curver == $latestver) {
413 $sto->status("Schema already up-to-date at version $curver.");
414 return 1;
417 if ($curver > $latestver) {
418 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
421 if ($curver) {
422 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
425 foreach my $t (TABLES, @extra_tables) {
426 $sto->create_table($t);
429 $sto->upgrade_add_host_getport;
430 $sto->upgrade_add_host_altip;
431 $sto->upgrade_add_device_asof;
432 $sto->upgrade_add_device_weight;
433 $sto->upgrade_add_device_readonly;
434 $sto->upgrade_add_device_drain;
435 $sto->upgrade_add_class_replpolicy;
436 $sto->upgrade_modify_server_settings_value;
437 $sto->upgrade_add_file_to_queue_arg;
439 return 1;
442 sub cached_schema_version {
443 my $self = shift;
444 return $self->{_cached_schema_version} ||=
445 $self->schema_version;
448 sub schema_version {
449 my $self = shift;
450 my $dbh = $self->dbh;
451 return eval {
452 $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
453 } || 0;
456 sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
458 sub create_table {
459 my ($self, $table) = @_;
460 my $dbh = $self->dbh;
461 return 1 if $self->table_exists($table);
462 my $meth = "TABLE_$table";
463 my $sql = $self->$meth;
464 $sql = $self->filter_create_sql($sql);
465 $self->status("Running SQL: $sql;");
466 $dbh->do($sql) or
467 die "Failed to create table $table: " . $dbh->errstr;
468 my $imeth = "INDEXES_$table";
469 my @indexes = eval { $self->$imeth };
470 foreach $sql (@indexes) {
471 $self->status("Running SQL: $sql;");
472 $dbh->do($sql) or
473 die "Failed to create indexes on $table: " . $dbh->errstr;
477 # Please try to keep all tables aligned nicely
478 # with '"CREATE TABLE' on the first line
479 # and ')"' alone on the last line.
481 sub TABLE_domain {
482 # classes are tied to domains. domains can have classes of items
483 # with different mindevcounts.
485 # a minimum devcount is the number of copies the system tries to
486 # maintain for files in that class
488 # unspecified classname means classid=0 (implicit class), and that
489 # implies mindevcount=2
490 "CREATE TABLE domain (
491 dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
492 namespace VARCHAR(255),
493 UNIQUE (namespace)
497 sub TABLE_class {
498 "CREATE TABLE class (
499 dmid SMALLINT UNSIGNED NOT NULL,
500 classid TINYINT UNSIGNED NOT NULL,
501 PRIMARY KEY (dmid,classid),
502 classname VARCHAR(50),
503 UNIQUE (dmid,classname),
504 mindevcount TINYINT UNSIGNED NOT NULL
508 # the length field is only here for easy verifications of content
509 # integrity when copying around. no sums or content types or other
510 # metadata here. application can handle that.
512 # classid is what class of file this belongs to. for instance, on fotobilder
513 # there will be a class for original pictures (the ones the user uploaded)
514 # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
515 # each domain can setup classes and assign the minimum redundancy level for
516 # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
517 # photos and and a 1 minimum for derived images (which means the sole device
518 # for a derived image can die, bringing devcount to 0 for that file, but
519 # the application can recreate it from its original)
520 sub TABLE_file {
521 "CREATE TABLE file (
522 fid INT UNSIGNED NOT NULL,
523 PRIMARY KEY (fid),
525 dmid SMALLINT UNSIGNED NOT NULL,
526 dkey VARCHAR(255), # domain-defined
527 UNIQUE dkey (dmid, dkey),
529 length BIGINT UNSIGNED, # big limit
531 classid TINYINT UNSIGNED NOT NULL,
532 devcount TINYINT UNSIGNED NOT NULL,
533 INDEX devcount (dmid,classid,devcount)
537 sub TABLE_tempfile {
538 "CREATE TABLE tempfile (
539 fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
540 PRIMARY KEY (fid),
542 createtime INT UNSIGNED NOT NULL,
543 classid TINYINT UNSIGNED NOT NULL,
544 dmid SMALLINT UNSIGNED NOT NULL,
545 dkey VARCHAR(255),
546 devids VARCHAR(60)
550 # files marked for death when their key is overwritten. then they get a new
551 # fid, but since the old row (with the old fid) had to be deleted immediately,
552 # we need a place to store the fid so an async job can delete the file from
553 # all devices.
554 sub TABLE_file_to_delete {
555 "CREATE TABLE file_to_delete (
556 fid INT UNSIGNED NOT NULL,
557 PRIMARY KEY (fid)
561 # if the replicator notices that a fid has no sources, that file gets inserted
562 # into the unreachable_fids table. it is up to the application to actually
563 # handle fids stored in this table.
564 sub TABLE_unreachable_fids {
565 "CREATE TABLE unreachable_fids (
566 fid INT UNSIGNED NOT NULL,
567 lastupdate INT UNSIGNED NOT NULL,
568 PRIMARY KEY (fid),
569 INDEX (lastupdate)
573 # what files are on what devices? (most likely physical devices,
574 # as logical devices of RAID arrays would be costly, and mogilefs
575 # already handles redundancy)
577 # the devid index lets us answer "What files were on this now-dead disk?"
578 sub TABLE_file_on {
579 "CREATE TABLE file_on (
580 fid INT UNSIGNED NOT NULL,
581 devid MEDIUMINT UNSIGNED NOT NULL,
582 PRIMARY KEY (fid, devid),
583 INDEX (devid)
587 # if application or framework detects an error in one of the duplicate files
588 # for whatever reason, it can register its complaint and the framework
589 # will do some verifications and fix things up w/ an async job
590 # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
591 # on the other devices?
592 sub TABLE_file_on_corrupt {
593 "CREATE TABLE file_on_corrupt (
594 fid INT UNSIGNED NOT NULL,
595 devid MEDIUMINT UNSIGNED NOT NULL,
596 PRIMARY KEY (fid, devid)
600 # hosts (which contain devices...)
601 sub TABLE_host {
602 "CREATE TABLE host (
603 hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
605 status ENUM('alive','dead','down'),
606 http_port MEDIUMINT UNSIGNED DEFAULT 7500,
607 http_get_port MEDIUMINT UNSIGNED,
609 hostname VARCHAR(40),
610 hostip VARCHAR(15),
611 altip VARCHAR(15),
612 altmask VARCHAR(18),
613 UNIQUE (hostname),
614 UNIQUE (hostip),
615 UNIQUE (altip)
619 # disks...
620 sub TABLE_device {
621 "CREATE TABLE device (
622 devid MEDIUMINT UNSIGNED NOT NULL,
623 hostid MEDIUMINT UNSIGNED NOT NULL,
625 status ENUM('alive','dead','down'),
626 weight MEDIUMINT DEFAULT 100,
628 mb_total MEDIUMINT UNSIGNED,
629 mb_used MEDIUMINT UNSIGNED,
630 mb_asof INT UNSIGNED,
631 PRIMARY KEY (devid),
632 INDEX (status)
636 sub TABLE_server_settings {
637 "CREATE TABLE server_settings (
638 field VARCHAR(50) PRIMARY KEY,
639 value TEXT
643 sub TABLE_file_to_replicate {
644 # nexttry is time to try to replicate it next.
645 # 0 means immediate. it's only on one host.
646 # 1 means lower priority. it's on 2+ but isn't happy where it's at.
647 # unix timestamp means at/after that time. some previous error occurred.
648 # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
649 # failcount. how many times we've failed, just for doing backoff of nexttry.
650 # flags. reserved for future use.
651 "CREATE TABLE file_to_replicate (
652 fid INT UNSIGNED NOT NULL PRIMARY KEY,
653 nexttry INT UNSIGNED NOT NULL,
654 INDEX (nexttry),
655 fromdevid INT UNSIGNED,
656 failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
657 flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
661 sub TABLE_file_to_delete_later {
662 "CREATE TABLE file_to_delete_later (
663 fid INT UNSIGNED NOT NULL PRIMARY KEY,
664 delafter INT UNSIGNED NOT NULL,
665 INDEX (delafter)
669 sub TABLE_fsck_log {
670 "CREATE TABLE fsck_log (
671 logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
672 PRIMARY KEY (logid),
673 utime INT UNSIGNED NOT NULL,
674 fid INT UNSIGNED NULL,
675 evcode CHAR(4),
676 devid MEDIUMINT UNSIGNED,
677 INDEX(utime)
681 # generic queue table, designed to be used for workers/jobs which aren't
682 # constantly in use, and are async to the user.
683 # ie; fsck, drain, rebalance.
684 sub TABLE_file_to_queue {
685 "CREATE TABLE file_to_queue (
686 fid INT UNSIGNED NOT NULL,
687 devid INT UNSIGNED,
688 type TINYINT UNSIGNED NOT NULL,
689 nexttry INT UNSIGNED NOT NULL,
690 failcount TINYINT UNSIGNED NOT NULL default '0',
691 flags SMALLINT UNSIGNED NOT NULL default '0',
692 arg TEXT,
693 PRIMARY KEY (fid, type),
694 INDEX type_nexttry (type,nexttry)
698 # new style async delete table.
699 # this is separate from file_to_queue since deletes are more actively used,
700 # and partitioning on 'type' doesn't always work so well.
701 sub TABLE_file_to_delete2 {
702 "CREATE TABLE file_to_delete2 (
703 fid INT UNSIGNED NOT NULL PRIMARY KEY,
704 nexttry INT UNSIGNED NOT NULL,
705 failcount TINYINT UNSIGNED NOT NULL default '0',
706 INDEX nexttry (nexttry)
710 # these five only necessary for MySQL, since no other database existed
711 # before, so they can just create the tables correctly to begin with.
712 # in the future, there might be new alters that non-MySQL databases
713 # will have to implement.
714 sub upgrade_add_host_getport { 1 }
715 sub upgrade_add_host_altip { 1 }
716 sub upgrade_add_device_asof { 1 }
717 sub upgrade_add_device_weight { 1 }
718 sub upgrade_add_device_readonly { 1 }
719 sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
720 sub upgrade_modify_server_settings_value { die "Not implemented in $_[0]" }
721 sub upgrade_add_file_to_queue_arg { die "Not implemented in $_[0]" }
723 sub upgrade_add_class_replpolicy {
724 my ($self) = @_;
725 unless ($self->column_type("class", "replpolicy")) {
726 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
730 # return true if deleted, 0 if didn't exist, exception if error
731 sub delete_host {
732 my ($self, $hostid) = @_;
733 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
736 # return true if deleted, 0 if didn't exist, exception if error
737 sub delete_domain {
738 my ($self, $dmid) = @_;
739 return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
742 sub domain_has_files {
743 my ($self, $dmid) = @_;
744 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
745 undef, $dmid);
746 return $has_a_fid ? 1 : 0;
749 sub class_has_files {
750 my ($self, $dmid, $clid) = @_;
751 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
752 undef, $dmid, $clid);
753 return $has_a_fid ? 1 : 0;
756 # return new classid on success (non-zero integer), die on failure
757 # throw 'dup' on duplicate name
758 # override this if you want a less racy version.
759 sub create_class {
760 my ($self, $dmid, $classname) = @_;
761 my $dbh = $self->dbh;
763 # get the max class id in this domain
764 my $maxid = $dbh->selectrow_array
765 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
767 # now insert the new class
768 my $rv = eval {
769 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
770 undef, $dmid, $maxid + 1, $classname, 2);
772 if ($@ || $dbh->err) {
773 if ($self->was_duplicate_error) {
774 throw("dup");
777 return $maxid + 1 if $rv;
778 $self->condthrow;
779 die;
782 # return 1 on success, throw "dup" on duplicate name error, die otherwise
783 sub update_class_name {
784 my $self = shift;
785 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
786 my $rv = eval {
787 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
788 undef, $arg{classname}, $arg{dmid}, $arg{classid});
790 throw("dup") if $self->was_duplicate_error;
791 $self->condthrow;
792 return 1;
795 # return 1 on success, die otherwise
796 sub update_class_mindevcount {
797 my $self = shift;
798 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
799 eval {
800 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
801 undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
803 $self->condthrow;
804 return 1;
807 # return 1 on success, die otherwise
808 sub update_class_replpolicy {
809 my $self = shift;
810 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
811 eval {
812 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
813 undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
815 $self->condthrow;
816 return 1;
819 sub nfiles_with_dmid_classid_devcount {
820 my ($self, $dmid, $classid, $devcount) = @_;
821 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
822 undef, $dmid, $classid, $devcount);
825 sub set_server_setting {
826 my ($self, $key, $val) = @_;
827 my $dbh = $self->dbh;
828 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
830 eval {
831 if (defined $val) {
832 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
833 } else {
834 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
838 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
839 return 1;
842 # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
843 sub incr_server_setting {
844 my ($self, $key, $val) = @_;
845 $val = 1 unless defined $val;
846 return unless $val;
848 return 1 if $self->dbh->do("UPDATE server_settings ".
849 "SET value=value+? ".
850 "WHERE field=?", undef,
851 $val, $key) > 0;
852 $self->set_server_setting($key, $val);
855 sub server_setting {
856 my ($self, $key) = @_;
857 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
858 undef, $key);
861 # generic server setting cache.
862 # note that you can call the same server setting with different timeouts, but
863 # the timeout specified at the time of ... timeout, wins.
864 sub server_setting_cached {
865 my ($self, $key, $timeout) = @_;
866 $self->{server_setting_cache}->{$key} ||= {val => '', refresh => 0};
867 my $cache = $self->{server_setting_cache}->{$key};
868 my $now = time();
869 if ($now > $cache->{refresh}) {
870 $cache->{val} = $self->server_setting($key);
871 $cache->{refresh} = $now + $timeout;
873 return $cache->{val};
876 sub server_settings {
877 my ($self) = @_;
878 my $ret = {};
879 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
880 $sth->execute;
881 while (my ($k, $v) = $sth->fetchrow_array) {
882 $ret->{$k} = $v;
884 return $ret;
887 # register a tempfile and return the fidid, which should be allocated
888 # using autoincrement/sequences if the passed in fid is undef. however,
889 # if fid is passed in, that value should be used and returned.
891 # return new/passed in fidid on success.
892 # throw 'dup' if fid already in use
893 # return 0/undef/die on failure
895 sub register_tempfile {
896 my $self = shift;
897 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
899 my $dbh = $self->dbh;
900 my $fid = $arg{fid};
902 my $explicit_fid_used = $fid ? 1 : 0;
904 # setup the new mapping. we store the devices that we picked for
905 # this file in here, knowing that they might not be used. create_close
906 # is responsible for actually mapping in file_on. NOTE: fid is being
907 # passed in, it's either some number they gave us, or it's going to be
908 # 0/undef which translates into NULL which means to automatically create
909 # one. that should be fine.
910 my $ins_tempfile = sub {
911 my $rv = eval {
912 # We must only pass the correct number of bind parameters
913 # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
914 # Postgres, where you are expected to leave it out or use DEFAULT
915 # Leaving it out seems sanest and least likely to cause problems
916 # with other databases.
917 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
918 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
919 my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
920 # Do not check for $explicit_fid_used, but rather $fid directly
921 # as this anonymous sub is called from the loop later
922 if($fid) {
923 unshift @keys, 'fid';
924 unshift @vars, '?';
925 unshift @vals, $fid;
927 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
928 $dbh->do($sql, undef, @vals);
930 if (!$rv) {
931 return undef if $self->was_duplicate_error;
932 die "Unexpected db error into tempfile: " . $dbh->errstr;
935 unless (defined $fid) {
936 # if they did not give us a fid, then we want to grab the one that was
937 # theoretically automatically generated
938 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
939 or die "No last_insert_id found";
941 return undef unless defined $fid && $fid > 0;
942 return 1;
945 unless ($ins_tempfile->()) {
946 throw("dup") if $explicit_fid_used;
947 die "tempfile insert failed";
950 my $fid_in_use = sub {
951 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
952 return $exists ? 1 : 0;
955 # if the fid is in use, do something
956 while ($fid_in_use->($fid)) {
957 throw("dup") if $explicit_fid_used;
959 # be careful of databases which reset their
960 # auto-increment/sequences when the table is empty (InnoDB
961 # did/does this, for instance). So check if it's in use, and
962 # re-seed the table with the highest known fid from the file
963 # table.
965 # get the highest fid from the filetable and insert a dummy row
966 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
967 $ins_tempfile->(); # don't care about its result
969 # then do a normal auto-increment
970 $fid = undef;
971 $ins_tempfile->() or die "register_tempfile failed after seeding";
974 return $fid;
977 # return hashref of row containing columns "fid, dmid, dkey, length,
978 # classid, devcount" provided a $dmid and $key (dkey). or undef if no
979 # row.
980 sub file_row_from_dmid_key {
981 my ($self, $dmid, $key) = @_;
982 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
983 "FROM file WHERE dmid=? AND dkey=?",
984 undef, $dmid, $key);
987 # return hashref of row containing columns "fid, dmid, dkey, length,
988 # classid, devcount" provided a $fidid or undef if no row.
989 sub file_row_from_fidid {
990 my ($self, $fidid) = @_;
991 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
992 "FROM file WHERE fid=?",
993 undef, $fidid);
996 # return an arrayref of rows containing columns "fid, dmid, dkey, length,
997 # classid, devcount" provided a pair of $fidid or undef if no rows.
998 sub file_row_from_fidid_range {
999 my ($self, $fromfid, $tofid) = @_;
1000 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1001 "FROM file WHERE fid BETWEEN ? AND ?");
1002 $sth->execute($fromfid,$tofid);
1003 return $sth->fetchall_arrayref({});
1006 # return array of devids that a fidid is on
1007 sub fid_devids {
1008 my ($self, $fidid) = @_;
1009 return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
1010 undef, $fidid) || [] };
1013 # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1014 sub fid_devids_multiple {
1015 my ($self, @fidids) = @_;
1016 my $in = join(",", map { $_+0 } @fidids);
1017 my $ret = {};
1018 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1019 $sth->execute;
1020 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1021 push @{$ret->{$fidid} ||= []}, $devid;
1023 return $ret;
1026 # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1027 sub tempfile_row_from_fid {
1028 my ($self, $fidid) = @_;
1029 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey ".
1030 "FROM tempfile WHERE fid=?",
1031 undef, $fidid);
1034 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1035 sub create_device {
1036 my ($self, $devid, $hostid, $status) = @_;
1037 my $rv = $self->conddup(sub {
1038 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1039 $devid, $hostid, $status);
1041 $self->condthrow;
1042 die "error making device $devid\n" unless $rv > 0;
1043 return 1;
1046 sub update_device_usage {
1047 my $self = shift;
1048 my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
1049 eval {
1050 $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
1051 " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid});
1053 $self->condthrow;
1056 sub mark_fidid_unreachable {
1057 my ($self, $fidid) = @_;
1058 die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
1059 $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
1060 undef, $fidid);
1063 sub set_device_weight {
1064 my ($self, $devid, $weight) = @_;
1065 eval {
1066 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1068 $self->condthrow;
1071 sub set_device_state {
1072 my ($self, $devid, $state) = @_;
1073 eval {
1074 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1076 $self->condthrow;
1079 sub delete_class {
1080 my ($self, $dmid, $cid) = @_;
1081 eval {
1082 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1084 $self->condthrow;
1087 sub delete_fidid {
1088 my ($self, $fidid) = @_;
1089 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
1090 $self->condthrow;
1091 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1092 $self->condthrow;
1093 $self->enqueue_for_delete2($fidid, 0);
1094 $self->condthrow;
1097 sub delete_tempfile_row {
1098 my ($self, $fidid) = @_;
1099 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1100 $self->condthrow;
1101 return $rv;
1104 # Load the specified tempfile, then delete it. If we succeed, we were
1105 # here first; otherwise, someone else beat us here (and we return undef)
1106 sub delete_and_return_tempfile_row {
1107 my ($self, $fidid) = @_;
1108 my $rv = $self->tempfile_row_from_fid($fidid);
1109 my $rows_deleted = $self->delete_tempfile_row($fidid);
1110 return $rv if ($rows_deleted > 0);
1113 sub replace_into_file {
1114 my $self = shift;
1115 my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
1116 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1117 eval {
1118 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1119 "VALUES (?,?,?,?,?,0) ", undef,
1120 @arg{'fidid', 'dmid', 'key', 'length', 'classid'});
1122 $self->condthrow;
1125 # returns 1 on success, 0 on duplicate key error, dies on exception
1126 # TODO: need a test to hit the duplicate name error condition
1127 # TODO: switch to using "dup" exception here?
1128 sub rename_file {
1129 my ($self, $fidid, $to_key) = @_;
1130 my $dbh = $self->dbh;
1131 eval {
1132 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1133 undef, $to_key, $fidid);
1135 if ($@ || $dbh->err) {
1136 # first is MySQL's error code for duplicates
1137 if ($self->was_duplicate_error) {
1138 return 0;
1139 } else {
1140 die $@;
1143 $self->condthrow;
1144 return 1;
1147 # returns a hash of domains. Key is namespace, value is dmid.
1148 sub get_all_domains {
1149 my ($self) = @_;
1150 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1151 return map { ($_->[0], $_->[1]) } @{$domains || []};
1154 # returns an array of hashrefs, one hashref per row in the 'class' table
1155 sub get_all_classes {
1156 my ($self) = @_;
1157 my (@ret, $row);
1159 my $repl_col = "";
1160 if ($self->cached_schema_version >= 10) {
1161 $repl_col = ", replpolicy";
1164 my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount $repl_col FROM class");
1165 $sth->execute;
1166 push @ret, $row while $row = $sth->fetchrow_hashref;
1167 return @ret;
1170 # add a record of fidid existing on devid
1171 # returns 1 on success, 0 on duplicate
1172 sub add_fidid_to_devid {
1173 my ($self, $fidid, $devid) = @_;
1174 croak("fidid not non-zero") unless $fidid;
1175 croak("devid not non-zero") unless $devid;
1177 # TODO: This should possibly be insert_ignore instead
1178 # As if we are adding an extra file_on entry, we do not want to replace the
1179 # exist one. Check REPLACE semantics.
1180 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1181 undef, $fidid, $devid);
1182 return 1 if $rv > 0;
1183 return 0;
1186 # remove a record of fidid existing on devid
1187 # returns 1 on success, 0 if not there anyway
1188 sub remove_fidid_from_devid {
1189 my ($self, $fidid, $devid) = @_;
1190 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1191 undef, $fidid, $devid); };
1192 $self->condthrow;
1193 return $rv;
1196 # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1197 sub get_all_hosts {
1198 my ($self) = @_;
1199 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1200 "hostip, http_port, http_get_port, altip, altmask FROM host");
1201 $sth->execute;
1202 my @ret;
1203 while (my $row = $sth->fetchrow_hashref) {
1204 push @ret, $row;
1206 return @ret;
1209 # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1210 sub get_all_devices {
1211 my ($self) = @_;
1212 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1213 "mb_used, mb_asof, status, weight FROM device");
1214 $self->condthrow;
1215 $sth->execute;
1216 my @return;
1217 while (my $row = $sth->fetchrow_hashref) {
1218 push @return, $row;
1220 return @return;
1223 # update the device count for a given fidid
1224 sub update_devcount {
1225 my ($self, $fidid) = @_;
1226 my $dbh = $self->dbh;
1227 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1228 undef, $fidid);
1230 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1231 $ct, $fidid); };
1232 $self->condthrow;
1234 return 1;
1237 # update the classid for a given fidid
1238 sub update_classid {
1239 my ($self, $fidid, $classid) = @_;
1240 my $dbh = $self->dbh;
1242 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1243 $classid, $fidid);
1245 $self->condthrow;
1246 return 1;
1249 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1250 sub enqueue_for_replication {
1251 my ($self, $fidid, $from_devid, $in) = @_;
1253 $in = 0 unless $in;
1254 my $nexttry = $self->unix_timestamp . " + " . int($in);
1256 $self->retry_on_deadlock(sub {
1257 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1258 "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1262 # enqueue a fidid for delete
1263 # note: if we get one more "independent" queue like this, the
1264 # code should be collapsable? I tried once and it looked too ugly, so we have
1265 # some redundancy.
1266 sub enqueue_for_delete2 {
1267 my ($self, $fidid, $in) = @_;
1269 $in = 0 unless $in;
1270 my $nexttry = $self->unix_timestamp . " + " . int($in);
1272 $self->retry_on_deadlock(sub {
1273 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1274 "VALUES (?,$nexttry)", undef, $fidid);
1278 # enqueue a fidid for work
1279 sub enqueue_for_todo {
1280 my ($self, $fidid, $type, $in) = @_;
1282 $in = 0 unless $in;
1283 my $nexttry = $self->unix_timestamp . " + " . int($in);
1285 $self->retry_on_deadlock(sub {
1286 if (ref($fidid)) {
1287 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1288 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1289 $fidid->[0], $fidid->[1], $fidid->[2], $type);
1290 } else {
1291 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1292 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1297 # return 1 on success. die otherwise.
1298 sub enqueue_many_for_todo {
1299 my ($self, $fidids, $type, $in) = @_;
1300 if (@$fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1301 $self->enqueue_for_todo($_, $type, $in) foreach @$fidids;
1302 return 1;
1305 $in = 0 unless $in;
1306 my $nexttry = $self->unix_timestamp . " + " . int($in);
1308 # TODO: convert to prepared statement?
1309 $self->retry_on_deadlock(sub {
1310 if (ref($fidids->[0]) eq 'ARRAY') {
1311 my $sql = $self->ignore_replace .
1312 "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1313 join(', ', ('(?,?,?,?,?)') x scalar @$fidids);
1314 $self->dbh->do($sql, undef, map { @$_, $type, $nexttry } @$fidids);
1315 } else {
1316 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1317 nexttry) VALUES " .
1318 join(",", map { "(" . int($_->{fid}) . ", $type, $nexttry)" } @$fidids));
1321 $self->condthrow;
1324 # For file_to_queue queues that should be kept small, find the size.
1325 # This isn't fast, but for small queues won't be slow, and is usually only ran
1326 # from a single tracker.
1327 sub file_queue_length {
1328 my $self = shift;
1329 my $type = shift;
1331 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1332 "WHERE type = ?", undef, $type);
1335 # reschedule all deferred replication, return number rescheduled
1336 sub replicate_now {
1337 my ($self) = @_;
1339 $self->retry_on_deadlock(sub {
1340 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1341 " WHERE nexttry > " . $self->unix_timestamp);
1345 # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1346 sub get_fidids_by_device {
1347 my ($self, $devid, $limit) = @_;
1349 my $dbh = $self->dbh;
1350 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1351 undef, $devid);
1352 return $fidids;
1355 # finds a chunk of fids given a set of constraints:
1356 # devid, fidid, age (new or old), limit
1357 # Note that if this function is very slow on your large DB, you're likely
1358 # sorting by "newfiles" and are missing a new index.
1359 # returns an arrayref of fidids
1360 sub get_fidid_chunks_by_device {
1361 my ($self, %o) = @_;
1363 my $dbh = $self->dbh;
1364 my $devid = delete $o{devid};
1365 croak("must supply at least a devid") unless $devid;
1366 my $age = delete $o{age};
1367 my $fidid = delete $o{fidid};
1368 my $limit = delete $o{limit};
1369 croak("invalid options: " . join(', ', keys %o)) if %o;
1370 # If supplied a "previous" fidid, we're paging through.
1371 my $fidsort = '';
1372 my $order = '';
1373 $age ||= 'old';
1374 if ($age eq 'old') {
1375 $fidsort = 'AND fid > ?' if $fidid;
1376 $order = 'ASC';
1377 } elsif ($age eq 'new') {
1378 $fidsort = 'AND fid < ?' if $fidid;
1379 $order = 'DESC';
1380 } else {
1381 croak("invalid age argument: " . $age);
1383 $limit ||= 100;
1384 my @extra = ();
1385 push @extra, $fidid if $fidid;
1387 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1388 $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1389 return $fidids;
1392 # takes two arguments, fidid to be above, and optional limit (default
1393 # 1,000). returns up to that that many fidids above the provided
1394 # fidid. returns array of MogileFS::FID objects, sorted by fid ids.
1395 sub get_fids_above_id {
1396 my ($self, $fidid, $limit) = @_;
1397 $limit ||= 1000;
1398 $limit = int($limit);
1400 my @ret;
1401 my $dbh = $self->dbh;
1402 my $sth = $dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1403 "FROM file ".
1404 "WHERE fid > ? ".
1405 "ORDER BY fid LIMIT $limit");
1406 $sth->execute($fidid);
1407 while (my $row = $sth->fetchrow_hashref) {
1408 push @ret, MogileFS::FID->new_from_db_row($row);
1410 return @ret;
1413 # Same as above, but returns unblessed hashref.
1414 sub get_fidids_above_id {
1415 my ($self, $fidid, $limit) = @_;
1416 $limit ||= 1000;
1417 $limit = int($limit);
1419 my $dbh = $self->dbh;
1420 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file WHERE fid > ?
1421 ORDER BY fid LIMIT $limit});
1422 return $fidids;
1425 # creates a new domain, given a domain namespace string. return the dmid on success,
1426 # throw 'dup' on duplicate name.
1427 # override if you want a less racy version.
1428 sub create_domain {
1429 my ($self, $name) = @_;
1430 my $dbh = $self->dbh;
1432 # get the max domain id
1433 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1434 my $rv = eval {
1435 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1436 undef, $maxid + 1, $name);
1438 if ($self->was_duplicate_error) {
1439 throw("dup");
1441 return $maxid+1 if $rv;
1442 die "failed to make domain"; # FIXME: the above is racy.
1445 sub update_host_property {
1446 my ($self, $hostid, $col, $val) = @_;
1447 $self->conddup(sub {
1448 $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
1450 return 1;
1453 # return ne hostid, or throw 'dup' on error.
1454 # NOTE: you need to put them into the initial 'down' state.
1455 sub create_host {
1456 my ($self, $hostname, $ip) = @_;
1457 my $dbh = $self->dbh;
1458 # racy! lazy. no, better: portable! how often does this happen? :)
1459 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1460 my $rv = $self->conddup(sub {
1461 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1462 "VALUES (?, ?, ?, 'down')",
1463 undef, $hid, $hostname, $ip);
1465 return $hid if $rv;
1466 die "db failure";
1469 # return array of row hashrefs containing columns: (fid, fromdevid,
1470 # failcount, flags, nexttry)
1471 sub files_to_replicate {
1472 my ($self, $limit) = @_;
1473 my $ut = $self->unix_timestamp;
1474 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1475 SELECT fid, fromdevid, failcount, flags, nexttry
1476 FROM file_to_replicate
1477 WHERE nexttry <= $ut
1478 ORDER BY nexttry
1479 LIMIT $limit
1480 }, "fid") or return ();
1481 return values %$to_repl_map;
1484 # "new" style queue consumption code.
1485 # from within a transaction, fetch a limit of fids,
1486 # then update each fid's nexttry to be off in the future,
1487 # giving local workers some time to dequeue the items.
1488 # Note:
1489 # DBI (even with RaiseError) returns weird errors on
1490 # deadlocks from selectall_hashref. So we can't do that.
1491 # we also used to retry on deadlock within the routine,
1492 # but instead lets return undef and let job_master retry.
1493 sub grab_queue_chunk {
1494 my $self = shift;
1495 my $queue = shift;
1496 my $limit = shift;
1497 my $extfields = shift;
1499 my $dbh = $self->dbh;
1500 my $tries = 3;
1501 my $work;
1503 my $extwhere = shift || '';
1504 my $fields = 'fid, nexttry, failcount';
1505 $fields .= ', ' . $extfields if $extfields;
1506 eval {
1507 $dbh->begin_work;
1508 my $ut = $self->unix_timestamp;
1509 my $sth = $dbh->prepare(qq{
1510 SELECT $fields
1511 FROM $queue
1512 WHERE nexttry <= $ut
1513 $extwhere
1514 ORDER BY nexttry
1515 LIMIT $limit
1516 FOR UPDATE
1518 $sth->execute;
1519 $work = $sth->fetchall_hashref('fid');
1520 # Nothing to work on.
1521 # Now claim the fids for a while.
1522 # TODO: Should be configurable... but not necessary.
1523 my $fidlist = join(',', keys %$work);
1524 unless ($fidlist) { $dbh->commit; return; }
1525 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1526 $dbh->commit;
1528 if ($self->was_deadlock_error) {
1529 eval { $dbh->rollback };
1530 return ();
1532 $self->condthrow;
1534 return defined $work ? values %$work : ();
1537 sub grab_files_to_replicate {
1538 my ($self, $limit) = @_;
1539 return $self->grab_queue_chunk('file_to_replicate', $limit,
1540 'fromdevid, flags');
1543 sub grab_files_to_delete2 {
1544 my ($self, $limit) = @_;
1545 return $self->grab_queue_chunk('file_to_delete2', $limit);
1548 # $extwhere is ugly... but should be fine.
1549 sub grab_files_to_queued {
1550 my ($self, $type, $what, $limit) = @_;
1551 $what ||= 'type, flags';
1552 return $self->grab_queue_chunk('file_to_queue', $limit,
1553 $what, 'AND type = ' . $type);
1556 # although it's safe to have multiple tracker hosts and/or processes
1557 # replicating the same file, around, it's inefficient CPU/time-wise,
1558 # and it's also possible they pick different places and waste disk.
1559 # so the replicator asks the store interface when it's about to start
1560 # and when it's done replicating a fidid, so you can do something smart
1561 # and tell it not to.
1562 sub should_begin_replicating_fidid {
1563 my ($self, $fidid) = @_;
1564 warn("Inefficient implementation of should_begin_replicating_fidid() in $self!\n");
1568 # called when replicator is done replicating a fid, so you can cleanup
1569 # whatever you did in 'should_begin_replicating_fidid' above.
1571 # NOTE: there's a theoretical race condition in the rebalance code,
1572 # where (without locking as provided by
1573 # should_begin_replicating_fidid/note_done_replicating), all copies of
1574 # a file can be deleted by independent replicators doing rebalancing
1575 # in different ways. so you'll probably want to implement some
1576 # locking in this pair of functions.
1577 sub note_done_replicating {
1578 my ($self, $fidid) = @_;
1581 sub delete_fid_from_file_to_replicate {
1582 my ($self, $fidid) = @_;
1583 $self->retry_on_deadlock(sub {
1584 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1588 sub delete_fid_from_file_to_queue {
1589 my ($self, $fidid, $type) = @_;
1590 $self->retry_on_deadlock(sub {
1591 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1592 undef, $fidid, $type);
1596 sub delete_fid_from_file_to_delete2 {
1597 my ($self, $fidid) = @_;
1598 $self->retry_on_deadlock(sub {
1599 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1603 sub reschedule_file_to_replicate_absolute {
1604 my ($self, $fid, $abstime) = @_;
1605 $self->retry_on_deadlock(sub {
1606 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1607 undef, $abstime, $fid);
1611 sub reschedule_file_to_replicate_relative {
1612 my ($self, $fid, $in_n_secs) = @_;
1613 $self->retry_on_deadlock(sub {
1614 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1615 "failcount = failcount + 1 WHERE fid = ?",
1616 undef, $in_n_secs, $fid);
1620 sub reschedule_file_to_delete2_absolute {
1621 my ($self, $fid, $abstime) = @_;
1622 $self->retry_on_deadlock(sub {
1623 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1624 undef, $abstime, $fid);
1628 sub reschedule_file_to_delete2_relative {
1629 my ($self, $fid, $in_n_secs) = @_;
1630 $self->retry_on_deadlock(sub {
1631 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1632 "failcount = failcount + 1 WHERE fid = ?",
1633 undef, $in_n_secs, $fid);
1637 # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1638 # table.
1639 sub get_keys_like {
1640 my ($self, $dmid, $prefix, $after, $limit) = @_;
1641 # fix the input... prefix always ends with a % so that it works
1642 # in a LIKE call, and after is either blank or something
1643 $prefix = '' unless defined $prefix;
1644 $prefix .= '%';
1645 $after = '' unless defined $after;
1647 # now select out our keys
1648 return $self->dbh->selectcol_arrayref
1649 ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
1650 "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
1653 # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1654 # that were created $secs_ago seconds ago or older.
1655 sub old_tempfiles {
1656 my ($self, $secs_old) = @_;
1657 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1658 "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1661 # given an array of MogileFS::DevFID objects, mass-insert them all
1662 # into file_on (ignoring if they're already present)
1663 sub mass_insert_file_on {
1664 my ($self, @devfids) = @_;
1665 return 1 unless @devfids;
1667 if (@devfids > 1 && ! $self->can_insert_multi) {
1668 $self->mass_insert_file_on($_) foreach @devfids;
1669 return 1;
1672 my (@qmarks, @binds);
1673 foreach my $df (@devfids) {
1674 my ($fidid, $devid) = ($df->fidid, $df->devid);
1675 Carp::croak("got a false fidid") unless $fidid;
1676 Carp::croak("got a false devid") unless $devid;
1677 push @binds, $fidid, $devid;
1678 push @qmarks, "(?,?)";
1681 # TODO: This should possibly be insert_ignore instead
1682 # As if we are adding an extra file_on entry, we do not want to replace the
1683 # exist one. Check REPLACE semantics.
1684 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1685 return 1;
1688 sub set_schema_vesion {
1689 my ($self, $ver) = @_;
1690 $self->set_server_setting("schema_version", int($ver));
1693 # returns array of fidids to try and delete again
1694 sub fids_to_delete_again {
1695 my $self = shift;
1696 my $ut = $self->unix_timestamp;
1697 return @{ $self->dbh->selectcol_arrayref(qq{
1698 SELECT fid
1699 FROM file_to_delete_later
1700 WHERE delafter < $ut
1701 LIMIT 500
1702 }) || [] };
1705 # return 1 on success. die otherwise.
1706 sub enqueue_fids_to_delete {
1707 my ($self, @fidids) = @_;
1708 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1709 # when the first row causes the duplicate error, and the remaining rows are
1710 # not processed.
1711 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1712 $self->enqueue_fids_to_delete($_) foreach @fidids;
1713 return 1;
1715 # TODO: convert to prepared statement?
1716 $self->retry_on_deadlock(sub {
1717 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1718 join(",", map { "(" . int($_) . ")" } @fidids));
1720 $self->condthrow;
1723 sub enqueue_fids_to_delete2 {
1724 my ($self, @fidids) = @_;
1725 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1726 # when the first row causes the duplicate error, and the remaining rows are
1727 # not processed.
1728 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1729 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1730 return 1;
1733 my $nexttry = $self->unix_timestamp;
1735 # TODO: convert to prepared statement?
1736 $self->retry_on_deadlock(sub {
1737 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
1738 nexttry) VALUES " .
1739 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
1741 $self->condthrow;
1744 # clears everything from the fsck_log table
1745 # return 1 on success. die otherwise.
1746 sub clear_fsck_log {
1747 my $self = shift;
1748 $self->dbh->do("DELETE FROM fsck_log");
1749 return 1;
1752 # FIXME: Fsck log entries are processed a little out of order.
1753 # Once a fsck has completed, the log should be re-summarized.
1754 sub fsck_log_summarize {
1755 my $self = shift;
1757 my $lockname = 'mgfs:fscksum';
1758 my $lock = eval { $self->get_lock($lockname, 10) };
1759 return 0 if defined $lock && $lock == 0;
1761 my $logid = $self->max_fsck_logid;
1763 # sum-up evcode counts every so often, to make fsck_status faster,
1764 # avoiding a potentially-huge GROUP BY in the future..
1765 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
1766 # both inclusive:
1767 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
1768 $min_logid++;
1769 my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
1770 while (my ($evcode, $ct) = each %$cts) {
1771 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
1773 $self->set_server_setting("fsck_logid_processed", $logid);
1775 $self->release_lock($lockname) if $lock;
1778 sub fsck_log {
1779 my ($self, %opts) = @_;
1780 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
1781 "VALUES (" . $self->unix_timestamp . ",?,?,?)",
1782 undef,
1783 delete $opts{fid},
1784 delete $opts{code},
1785 delete $opts{devid});
1786 croak("Unknown opts") if %opts;
1787 $self->condthrow;
1789 return 1;
1792 sub get_db_unixtime {
1793 my $self = shift;
1794 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
1797 sub max_fidid {
1798 my $self = shift;
1799 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
1802 sub max_fsck_logid {
1803 my $self = shift;
1804 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
1807 # returns array of $row hashrefs, from fsck_log table
1808 sub fsck_log_rows {
1809 my ($self, $after_logid, $limit) = @_;
1810 $limit = int($limit || 100);
1811 $after_logid = int($after_logid || 0);
1813 my @rows;
1814 my $sth = $self->dbh->prepare(qq{
1815 SELECT logid, utime, fid, evcode, devid
1816 FROM fsck_log
1817 WHERE logid > ?
1818 ORDER BY logid
1819 LIMIT $limit
1821 $sth->execute($after_logid);
1822 my $row;
1823 push @rows, $row while $row = $sth->fetchrow_hashref;
1824 return @rows;
1827 sub fsck_evcode_counts {
1828 my ($self, %opts) = @_;
1829 my $timegte = delete $opts{time_gte};
1830 my $logr = delete $opts{logid_range};
1831 die if %opts;
1833 my $ret = {};
1834 my $sth;
1835 if ($timegte) {
1836 $sth = $self->dbh->prepare(qq{
1837 SELECT evcode, COUNT(*) FROM fsck_log
1838 WHERE utime >= ?
1839 GROUP BY evcode
1841 $sth->execute($timegte||0);
1843 if ($logr) {
1844 $sth = $self->dbh->prepare(qq{
1845 SELECT evcode, COUNT(*) FROM fsck_log
1846 WHERE logid >= ? AND logid <= ?
1847 GROUP BY evcode
1849 $sth->execute($logr->[0], $logr->[1]);
1851 while (my ($ev, $ct) = $sth->fetchrow_array) {
1852 $ret->{$ev} = $ct;
1854 return $ret;
1857 # run before daemonizing. you can die from here if you see something's amiss. or emit
1858 # warnings.
1859 sub pre_daemonize_checks { }
1862 # attempt to grab a lock of lockname, and timeout after timeout seconds.
1863 # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
1864 sub get_lock {
1865 my ($self, $lockname, $timeout) = @_;
1866 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
1867 die "get_lock not implemented for $self";
1870 # attempt to release a lock of lockname.
1871 # returns 1 on success and 0 if no lock we have has that name.
1872 sub release_lock {
1873 my ($self, $lockname) = @_;
1874 die "release_lock not implemented for $self";
1877 # returns up to $limit @fidids which are on provided $devid
1878 sub random_fids_on_device {
1879 my ($self, $devid, $limit) = @_;
1880 $limit = int($limit) || 100;
1882 my $dbh = $self->dbh;
1884 # FIXME: this blows. not random. and good chances these will
1885 # eventually get to point where they're un-rebalance-able, and we
1886 # never move on past the first 5000
1887 my @some_fids = List::Util::shuffle(@{
1888 $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid=? LIMIT 5000",
1889 undef, $devid) || []
1892 @some_fids = @some_fids[0..$limit-1] if $limit < @some_fids;
1893 return @some_fids;
1898 __END__
1900 =head1 NAME
1902 MogileFS::Store - data storage provider. base class.
1904 =head1 ABOUT
1906 MogileFS aims to be database-independent (though currently as of late
1907 2006 only works with MySQL). In the future, the server will create a
1908 singleton instance of type "MogileFS::Store", like
1909 L<MogileFS::Store::MySQL>, and all database interaction will be
1910 through it.
1912 =head1 SEE ALSO
1914 L<MogileFS::Store::MySQL>