allow server setting to un-molest slave list
[MogileFS-Server.git] / lib / MogileFS / Store.pm
blob6c5022b7619c09e25df5e76e3d70884a5c955b88
1 package MogileFS::Store;
2 use strict;
3 use warnings;
4 use Carp qw(croak);
5 use MogileFS::Util qw(throw max error);
6 use DBI; # no reason a Store has to be DBI-based, but for now they all are.
7 use List::Util qw(shuffle);
9 # this is incremented whenever the schema changes. server will refuse
10 # to start-up with an old schema version
12 # 6: adds file_to_replicate table
13 # 7: adds file_to_delete_later table
14 # 8: adds fsck_log table
15 # 9: adds 'drain' state to enum in device table
16 # 10: adds 'replpolicy' column to 'class' table
17 # 11: adds 'file_to_queue' table
18 # 12: adds 'file_to_delete2' table
19 # 13: modifies 'server_settings.value' to TEXT for wider values
20 # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21 # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22 use constant SCHEMA_VERSION => 14;
24 sub new {
25 my ($class) = @_;
26 return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass max_handles));
29 sub new_from_dsn_user_pass {
30 my ($class, $dsn, $user, $pass, $max_handles) = @_;
31 my $subclass;
32 if ($dsn =~ /^DBI:mysql:/i) {
33 $subclass = "MogileFS::Store::MySQL";
34 } elsif ($dsn =~ /^DBI:SQLite:/i) {
35 $subclass = "MogileFS::Store::SQLite";
36 } elsif ($dsn =~ /^DBI:Oracle:/i) {
37 $subclass = "MogileFS::Store::Oracle";
38 } elsif ($dsn =~ /^DBI:Pg:/i) {
39 $subclass = "MogileFS::Store::Postgres";
40 } else {
41 die "Unknown database type: $dsn";
43 unless (eval "use $subclass; 1") {
44 die "Error loading $subclass: $@\n";
46 my $self = bless {
47 dsn => $dsn,
48 user => $user,
49 pass => $pass,
50 max_handles => $max_handles, # Max number of handles to allow
51 raise_errors => $subclass->want_raise_errors,
52 slave_list_version => 0,
53 slave_list_cache => [],
54 recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested
55 recheck_done_gen => 0, # once recheck is done, copy of what the request generation was
56 handles_left => 0, # amount of times this handle can still be verified
57 connected_slaves => {},
58 dead_slaves => {},
59 }, $subclass;
60 $self->init;
61 return $self;
64 # Defaults to true now.
65 sub want_raise_errors {
69 sub new_from_mogdbsetup {
70 my ($class, %args) = @_;
71 # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
72 my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}, $args{dbport});
74 my $try_make_sto = sub {
75 my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
76 PrintError => 0,
77 }) or return undef;
78 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
79 $sto->raise_errors;
80 return $sto;
83 # upgrading, apparently, as this database already exists.
84 my $sto = $try_make_sto->();
85 return $sto if $sto;
87 # otherwise, we need to make the requested database, setup permissions, etc
88 $class->status("couldn't connect to database as mogilefs user. trying root...");
89 my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}, $args{dbport});
90 my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
91 PrintError => 0,
92 }) or
93 die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI->errstr . "\n";
94 $class->status("connected to database as root user.");
96 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
97 $class->create_db_if_not_exists($rdbh, $args{dbname});
98 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
99 $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
101 # should be ready now:
102 $sto = $try_make_sto->();
103 return $sto if $sto;
105 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
108 # given a root DBI connection, create the named database. succeed
109 # if it it's made, or already exists. die otherwise.
110 sub create_db_if_not_exists {
111 my ($pkg, $rdbh, $dbname) = @_;
112 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
113 or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
116 sub grant_privileges {
117 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
118 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
119 undef, $pass)
120 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
121 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
122 undef, $pass)
123 or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
126 sub can_replace { 0 }
127 sub can_insertignore { 0 }
128 sub can_insert_multi { 0 }
129 sub can_for_update { 1 }
131 sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
133 sub ignore_replace {
134 my $self = shift;
135 return "INSERT IGNORE " if $self->can_insertignore;
136 return "REPLACE " if $self->can_replace;
137 die "Can't INSERT IGNORE or REPLACE?";
140 my $on_status = sub {};
141 my $on_confirm = sub { 1 };
142 sub on_status { my ($pkg, $code) = @_; $on_status = $code; };
143 sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
144 sub status { my ($pkg, $msg) = @_; $on_status->($msg); };
145 sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
147 sub latest_schema_version { SCHEMA_VERSION }
149 sub raise_errors {
150 my $self = shift;
151 $self->{raise_errors} = 1;
152 $self->dbh->{RaiseError} = 1;
155 sub dsn { $_[0]{dsn} }
156 sub user { $_[0]{user} }
157 sub pass { $_[0]{pass} }
159 sub init { 1 }
160 sub post_dbi_connect { 1 }
162 sub can_do_slaves { 0 }
164 sub mark_as_slave {
165 my $self = shift;
166 die "Incapable of becoming slave." unless $self->can_do_slaves;
168 $self->{slave} = 1;
171 sub is_slave {
172 my $self = shift;
173 return $self->{slave};
176 sub _slaves_list_changed {
177 my $self = shift;
178 my $ver = MogileFS::Config->server_setting_cached('slave_version') || 0;
179 if ($ver <= $self->{slave_list_version}) {
180 return 0;
182 $self->{slave_list_version} = $ver;
183 # Restart connections from scratch if the configuration changed.
184 $self->{connected_slaves} = {};
185 return 1;
188 # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
189 sub _slaves_list {
190 my $self = shift;
191 my $now = time();
193 my $sk = MogileFS::Config->server_setting_cached('slave_keys')
194 or return ();
196 my @ret;
197 foreach my $key (split /\s*,\s*/, $sk) {
198 my $slave = MogileFS::Config->server_setting_cached("slave_$key");
200 if (!$slave) {
201 error("key for slave DB config: slave_$key not found in configuration");
202 next;
205 my ($dsn, $user, $pass) = split /\|/, $slave;
206 if (!defined($dsn) or !defined($user) or !defined($pass)) {
207 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
208 next;
210 push @ret, [$dsn, $user, $pass]
213 $self->{slave_list_cache} = \@ret;
214 return @ret;
217 sub _pick_slave {
218 my $self = shift;
219 my @temp = shuffle keys %{$self->{connected_slaves}};
220 return unless @temp;
221 return $self->{connected_slaves}->{$temp[0]};
224 sub get_slave {
225 my $self = shift;
226 my $now = time();
228 die "Incapable of having slaves." unless $self->can_do_slaves;
230 $self->{slave} = undef;
231 unless ($self->_slaves_list_changed) {
232 if ($self->{slave} = $self->_pick_slave) {
233 $self->{slave}->{recheck_req_gen} = $self->{recheck_req_gen};
234 return $self->{slave} if $self->check_slave;
238 if ($self->{slave}) {
239 my $dsn = $self->{slave}->{dsn};
240 $self->{dead_slaves}->{$dsn} = $now;
241 delete $self->{connected_slaves}->{$dsn};
242 error("Error talking to slave: $dsn");
244 my @slaves_list = $self->_slaves_list;
246 # If we have no slaves, then return silently.
247 return unless @slaves_list;
249 unless (MogileFS::Config->server_setting_cached('slave_skip_filtering') eq 'on') {
250 MogileFS::run_global_hook('slave_list_filter', \@slaves_list);
253 my $dead_retry =
254 MogileFS::Config->server_setting_cached('slave_dead_retry_timeout') || 15;
256 foreach my $slave_fulldsn (@slaves_list) {
257 my $dead_timeout = $self->{dead_slaves}->{$slave_fulldsn->[0]};
258 next if (defined $dead_timeout && $dead_timeout + $dead_retry > $now);
259 next if ($self->{connected_slaves}->{$slave_fulldsn->[0]});
261 my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
262 $self->{slave}->{next_check} = 0;
263 $newslave->mark_as_slave;
264 if ($self->check_slave) {
265 $self->{connected_slaves}->{$slave_fulldsn->[0]} = $newslave;
266 } else {
267 $self->{dead_slaves}->{$slave_fulldsn->[0]} = $now;
271 if ($self->{slave} = $self->_pick_slave) {
272 return $self->{slave};
274 warn "Slave list exhausted, failing back to master.";
275 return;
278 sub read_store {
279 my $self = shift;
281 return $self unless $self->can_do_slaves;
283 if ($self->{slave_ok}) {
284 if (my $slave = $self->get_slave) {
285 return $slave;
289 return $self;
292 sub slaves_ok {
293 my $self = shift;
294 my $coderef = shift;
296 return unless ref $coderef eq 'CODE';
298 local $self->{slave_ok} = 1;
300 return $coderef->(@_);
303 sub recheck_dbh {
304 my $self = shift;
305 $self->{recheck_req_gen}++;
308 sub dbh {
309 my $self = shift;
311 if ($self->{dbh}) {
312 if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
313 $self->{dbh} = undef unless $self->{dbh}->ping;
314 # Handles a memory leak under Solaris/Postgres.
315 $self->{dbh} = undef if ($self->{max_handles} &&
316 $self->{handles_left}-- < 0);
317 $self->{recheck_done_gen} = $self->{recheck_req_gen};
319 return $self->{dbh} if $self->{dbh};
322 $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
323 PrintError => 0,
324 AutoCommit => 1,
325 # FUTURE: will default to on (have to validate all callers first):
326 RaiseError => ($self->{raise_errors} || 0),
327 }) or
328 die "Failed to connect to database: " . DBI->errstr;
329 $self->post_dbi_connect;
330 $self->{handles_left} = $self->{max_handles} if $self->{max_handles};
331 return $self->{dbh};
334 sub have_dbh { return 1 if $_[0]->{dbh}; }
336 sub ping {
337 my $self = shift;
338 return $self->dbh->ping;
341 sub condthrow {
342 my ($self, $optmsg) = @_;
343 my $dbh = $self->dbh;
344 return 1 unless $dbh->err;
345 my ($pkg, $fn, $line) = caller;
346 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
347 $msg .= ": $optmsg" if $optmsg;
348 # Auto rollback failures around transactions.
349 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
350 croak($msg);
353 sub dowell {
354 my ($self, $sql, @do_params) = @_;
355 my $rv = eval { $self->dbh->do($sql, @do_params) };
356 return $rv unless $@ || $self->dbh->err;
357 warn "Error with SQL: $sql\n";
358 Carp::confess($@ || $self->dbh->errstr);
361 sub _valid_params {
362 croak("Odd number of parameters!") if scalar(@_) % 2;
363 my ($self, $vlist, %uarg) = @_;
364 my %ret;
365 $ret{$_} = delete $uarg{$_} foreach @$vlist;
366 croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
367 return %ret;
370 sub was_deadlock_error {
371 my $self = shift;
372 my $dbh = $self->dbh;
373 die "UNIMPLEMENTED";
376 sub was_duplicate_error {
377 my $self = shift;
378 my $dbh = $self->dbh;
379 die "UNIMPLEMENTED";
382 # run a subref (presumably a database update) in an eval, because you expect it to
383 # maybe fail on duplicate key error, and throw a dup exception for you, else return
384 # its return value
385 sub conddup {
386 my ($self, $code) = @_;
387 my $rv = eval { $code->(); };
388 throw("dup") if $self->was_duplicate_error;
389 croak($@) if $@;
390 return $rv;
393 # insert row if doesn't already exist
394 # WARNING: This function is NOT transaction safe if the duplicate errors causes
395 # your transaction to halt!
396 # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
397 # is false! Rows before the duplicate will be inserted, but rows after the
398 # duplicate might not be, depending your database.
399 sub insert_ignore {
400 my ($self, $sql, @params) = @_;
401 my $dbh = $self->dbh;
402 if ($self->can_insertignore) {
403 return $dbh->do("INSERT IGNORE $sql", @params);
404 } else {
405 # TODO: Detect bad multi-row insert here.
406 my $rv = eval { $dbh->do("INSERT $sql", @params); };
407 if ($@ || $dbh->err) {
408 return 1 if $self->was_duplicate_error;
409 # This chunk is identical to condthrow, but we include it directly
410 # here as we know there is definitely an error, and we would like
411 # the caller of this function.
412 my ($pkg, $fn, $line) = caller;
413 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
414 croak($msg);
416 return $rv;
420 sub retry_on_deadlock {
421 my $self = shift;
422 my $code = shift;
423 my $tries = shift || 3;
424 croak("deadlock retries must be positive") if $tries < 1;
425 my $rv;
427 while ($tries-- > 0) {
428 $rv = eval { $code->(); };
429 next if ($self->was_deadlock_error);
430 croak($@) if $@;
431 last;
433 return $rv;
436 # --------------------------------------------------------------------------
438 my @extra_tables;
440 sub add_extra_tables {
441 my $class = shift;
442 push @extra_tables, @_;
445 use constant TABLES => qw( domain class file tempfile file_to_delete
446 unreachable_fids file_on file_on_corrupt host
447 device server_settings file_to_replicate
448 file_to_delete_later fsck_log file_to_queue
449 file_to_delete2 );
451 sub setup_database {
452 my $sto = shift;
454 my $curver = $sto->schema_version;
456 my $latestver = SCHEMA_VERSION;
457 if ($curver == $latestver) {
458 $sto->status("Schema already up-to-date at version $curver.");
459 return 1;
462 if ($curver > $latestver) {
463 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
466 if ($curver) {
467 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
470 foreach my $t (TABLES, @extra_tables) {
471 $sto->create_table($t);
474 $sto->upgrade_add_host_getport;
475 $sto->upgrade_add_host_altip;
476 $sto->upgrade_add_device_asof;
477 $sto->upgrade_add_device_weight;
478 $sto->upgrade_add_device_readonly;
479 $sto->upgrade_add_device_drain;
480 $sto->upgrade_add_class_replpolicy;
481 $sto->upgrade_modify_server_settings_value;
482 $sto->upgrade_add_file_to_queue_arg;
483 $sto->upgrade_modify_device_size;
485 return 1;
488 sub cached_schema_version {
489 my $self = shift;
490 return $self->{_cached_schema_version} ||=
491 $self->schema_version;
494 sub schema_version {
495 my $self = shift;
496 my $dbh = $self->dbh;
497 return eval {
498 $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
499 } || 0;
502 sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
504 sub create_table {
505 my ($self, $table) = @_;
506 my $dbh = $self->dbh;
507 return 1 if $self->table_exists($table);
508 my $meth = "TABLE_$table";
509 my $sql = $self->$meth;
510 $sql = $self->filter_create_sql($sql);
511 $self->status("Running SQL: $sql;");
512 $dbh->do($sql) or
513 die "Failed to create table $table: " . $dbh->errstr;
514 my $imeth = "INDEXES_$table";
515 my @indexes = eval { $self->$imeth };
516 foreach $sql (@indexes) {
517 $self->status("Running SQL: $sql;");
518 $dbh->do($sql) or
519 die "Failed to create indexes on $table: " . $dbh->errstr;
523 # Please try to keep all tables aligned nicely
524 # with '"CREATE TABLE' on the first line
525 # and ')"' alone on the last line.
527 sub TABLE_domain {
528 # classes are tied to domains. domains can have classes of items
529 # with different mindevcounts.
531 # a minimum devcount is the number of copies the system tries to
532 # maintain for files in that class
534 # unspecified classname means classid=0 (implicit class), and that
535 # implies mindevcount=2
536 "CREATE TABLE domain (
537 dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
538 namespace VARCHAR(255),
539 UNIQUE (namespace)
543 sub TABLE_class {
544 "CREATE TABLE class (
545 dmid SMALLINT UNSIGNED NOT NULL,
546 classid TINYINT UNSIGNED NOT NULL,
547 PRIMARY KEY (dmid,classid),
548 classname VARCHAR(50),
549 UNIQUE (dmid,classname),
550 mindevcount TINYINT UNSIGNED NOT NULL
554 # the length field is only here for easy verifications of content
555 # integrity when copying around. no sums or content types or other
556 # metadata here. application can handle that.
558 # classid is what class of file this belongs to. for instance, on fotobilder
559 # there will be a class for original pictures (the ones the user uploaded)
560 # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
561 # each domain can setup classes and assign the minimum redundancy level for
562 # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
563 # photos and and a 1 minimum for derived images (which means the sole device
564 # for a derived image can die, bringing devcount to 0 for that file, but
565 # the application can recreate it from its original)
566 sub TABLE_file {
567 "CREATE TABLE file (
568 fid INT UNSIGNED NOT NULL,
569 PRIMARY KEY (fid),
571 dmid SMALLINT UNSIGNED NOT NULL,
572 dkey VARCHAR(255), # domain-defined
573 UNIQUE dkey (dmid, dkey),
575 length BIGINT UNSIGNED, # big limit
577 classid TINYINT UNSIGNED NOT NULL,
578 devcount TINYINT UNSIGNED NOT NULL,
579 INDEX devcount (dmid,classid,devcount)
583 sub TABLE_tempfile {
584 "CREATE TABLE tempfile (
585 fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
586 PRIMARY KEY (fid),
588 createtime INT UNSIGNED NOT NULL,
589 classid TINYINT UNSIGNED NOT NULL,
590 dmid SMALLINT UNSIGNED NOT NULL,
591 dkey VARCHAR(255),
592 devids VARCHAR(60)
596 # files marked for death when their key is overwritten. then they get a new
597 # fid, but since the old row (with the old fid) had to be deleted immediately,
598 # we need a place to store the fid so an async job can delete the file from
599 # all devices.
600 sub TABLE_file_to_delete {
601 "CREATE TABLE file_to_delete (
602 fid INT UNSIGNED NOT NULL,
603 PRIMARY KEY (fid)
607 # if the replicator notices that a fid has no sources, that file gets inserted
608 # into the unreachable_fids table. it is up to the application to actually
609 # handle fids stored in this table.
610 sub TABLE_unreachable_fids {
611 "CREATE TABLE unreachable_fids (
612 fid INT UNSIGNED NOT NULL,
613 lastupdate INT UNSIGNED NOT NULL,
614 PRIMARY KEY (fid),
615 INDEX (lastupdate)
619 # what files are on what devices? (most likely physical devices,
620 # as logical devices of RAID arrays would be costly, and mogilefs
621 # already handles redundancy)
623 # the devid index lets us answer "What files were on this now-dead disk?"
624 sub TABLE_file_on {
625 "CREATE TABLE file_on (
626 fid INT UNSIGNED NOT NULL,
627 devid MEDIUMINT UNSIGNED NOT NULL,
628 PRIMARY KEY (fid, devid),
629 INDEX (devid)
633 # if application or framework detects an error in one of the duplicate files
634 # for whatever reason, it can register its complaint and the framework
635 # will do some verifications and fix things up w/ an async job
636 # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
637 # on the other devices?
638 sub TABLE_file_on_corrupt {
639 "CREATE TABLE file_on_corrupt (
640 fid INT UNSIGNED NOT NULL,
641 devid MEDIUMINT UNSIGNED NOT NULL,
642 PRIMARY KEY (fid, devid)
646 # hosts (which contain devices...)
647 sub TABLE_host {
648 "CREATE TABLE host (
649 hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
651 status ENUM('alive','dead','down'),
652 http_port MEDIUMINT UNSIGNED DEFAULT 7500,
653 http_get_port MEDIUMINT UNSIGNED,
655 hostname VARCHAR(40),
656 hostip VARCHAR(15),
657 altip VARCHAR(15),
658 altmask VARCHAR(18),
659 UNIQUE (hostname),
660 UNIQUE (hostip),
661 UNIQUE (altip)
665 # disks...
666 sub TABLE_device {
667 "CREATE TABLE device (
668 devid MEDIUMINT UNSIGNED NOT NULL,
669 hostid MEDIUMINT UNSIGNED NOT NULL,
671 status ENUM('alive','dead','down'),
672 weight MEDIUMINT DEFAULT 100,
674 mb_total INT UNSIGNED,
675 mb_used INT UNSIGNED,
676 mb_asof INT UNSIGNED,
677 PRIMARY KEY (devid),
678 INDEX (status)
682 sub TABLE_server_settings {
683 "CREATE TABLE server_settings (
684 field VARCHAR(50) PRIMARY KEY,
685 value TEXT
689 sub TABLE_file_to_replicate {
690 # nexttry is time to try to replicate it next.
691 # 0 means immediate. it's only on one host.
692 # 1 means lower priority. it's on 2+ but isn't happy where it's at.
693 # unix timestamp means at/after that time. some previous error occurred.
694 # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
695 # failcount. how many times we've failed, just for doing backoff of nexttry.
696 # flags. reserved for future use.
697 "CREATE TABLE file_to_replicate (
698 fid INT UNSIGNED NOT NULL PRIMARY KEY,
699 nexttry INT UNSIGNED NOT NULL,
700 INDEX (nexttry),
701 fromdevid INT UNSIGNED,
702 failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
703 flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
707 sub TABLE_file_to_delete_later {
708 "CREATE TABLE file_to_delete_later (
709 fid INT UNSIGNED NOT NULL PRIMARY KEY,
710 delafter INT UNSIGNED NOT NULL,
711 INDEX (delafter)
715 sub TABLE_fsck_log {
716 "CREATE TABLE fsck_log (
717 logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
718 PRIMARY KEY (logid),
719 utime INT UNSIGNED NOT NULL,
720 fid INT UNSIGNED NULL,
721 evcode CHAR(4),
722 devid MEDIUMINT UNSIGNED,
723 INDEX(utime)
727 # generic queue table, designed to be used for workers/jobs which aren't
728 # constantly in use, and are async to the user.
729 # ie; fsck, drain, rebalance.
730 sub TABLE_file_to_queue {
731 "CREATE TABLE file_to_queue (
732 fid INT UNSIGNED NOT NULL,
733 devid INT UNSIGNED,
734 type TINYINT UNSIGNED NOT NULL,
735 nexttry INT UNSIGNED NOT NULL,
736 failcount TINYINT UNSIGNED NOT NULL default '0',
737 flags SMALLINT UNSIGNED NOT NULL default '0',
738 arg TEXT,
739 PRIMARY KEY (fid, type),
740 INDEX type_nexttry (type,nexttry)
744 # new style async delete table.
745 # this is separate from file_to_queue since deletes are more actively used,
746 # and partitioning on 'type' doesn't always work so well.
747 sub TABLE_file_to_delete2 {
748 "CREATE TABLE file_to_delete2 (
749 fid INT UNSIGNED NOT NULL PRIMARY KEY,
750 nexttry INT UNSIGNED NOT NULL,
751 failcount TINYINT UNSIGNED NOT NULL default '0',
752 INDEX nexttry (nexttry)
756 # these five only necessary for MySQL, since no other database existed
757 # before, so they can just create the tables correctly to begin with.
758 # in the future, there might be new alters that non-MySQL databases
759 # will have to implement.
760 sub upgrade_add_host_getport { 1 }
761 sub upgrade_add_host_altip { 1 }
762 sub upgrade_add_device_asof { 1 }
763 sub upgrade_add_device_weight { 1 }
764 sub upgrade_add_device_readonly { 1 }
765 sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
766 sub upgrade_modify_server_settings_value { die "Not implemented in $_[0]" }
767 sub upgrade_add_file_to_queue_arg { die "Not implemented in $_[0]" }
768 sub upgrade_modify_device_size { die "Not implemented in $_[0]" }
770 sub upgrade_add_class_replpolicy {
771 my ($self) = @_;
772 unless ($self->column_type("class", "replpolicy")) {
773 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
777 # return true if deleted, 0 if didn't exist, exception if error
778 sub delete_host {
779 my ($self, $hostid) = @_;
780 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
783 # return true if deleted, 0 if didn't exist, exception if error
784 sub delete_domain {
785 my ($self, $dmid) = @_;
786 throw("has_files") if $self->domain_has_files($dmid);
787 throw("has_classes") if $self->domain_has_classes($dmid);
788 return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
791 sub domain_has_files {
792 my ($self, $dmid) = @_;
793 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
794 undef, $dmid);
795 return $has_a_fid ? 1 : 0;
798 sub domain_has_classes {
799 my ($self, $dmid) = @_;
800 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? LIMIT 1',
801 undef, $dmid);
802 return $has_a_class ? 1 : 0;
805 sub class_has_files {
806 my ($self, $dmid, $clid) = @_;
807 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
808 undef, $dmid, $clid);
809 return $has_a_fid ? 1 : 0;
812 # return new classid on success (non-zero integer), die on failure
813 # throw 'dup' on duplicate name
814 # override this if you want a less racy version.
815 sub create_class {
816 my ($self, $dmid, $classname) = @_;
817 my $dbh = $self->dbh;
819 # get the max class id in this domain
820 my $maxid = $dbh->selectrow_array
821 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
823 my $clsid = $maxid + 1;
824 if ($classname eq 'default') {
825 $clsid = 0;
828 # now insert the new class
829 my $rv = eval {
830 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
831 undef, $dmid, $clsid, $classname, 2);
833 if ($@ || $dbh->err) {
834 if ($self->was_duplicate_error) {
835 throw("dup");
838 return $clsid if $rv;
839 $self->condthrow;
840 die;
843 # return 1 on success, throw "dup" on duplicate name error, die otherwise
844 sub update_class_name {
845 my $self = shift;
846 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
847 my $rv = eval {
848 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
849 undef, $arg{classname}, $arg{dmid}, $arg{classid});
851 throw("dup") if $self->was_duplicate_error;
852 $self->condthrow;
853 return 1;
856 # return 1 on success, die otherwise
857 sub update_class_mindevcount {
858 my $self = shift;
859 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
860 eval {
861 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
862 undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
864 $self->condthrow;
865 return 1;
868 # return 1 on success, die otherwise
869 sub update_class_replpolicy {
870 my $self = shift;
871 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
872 eval {
873 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
874 undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
876 $self->condthrow;
877 return 1;
880 sub nfiles_with_dmid_classid_devcount {
881 my ($self, $dmid, $classid, $devcount) = @_;
882 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
883 undef, $dmid, $classid, $devcount);
886 sub set_server_setting {
887 my ($self, $key, $val) = @_;
888 my $dbh = $self->dbh;
889 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
891 eval {
892 if (defined $val) {
893 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
894 } else {
895 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
899 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
900 return 1;
903 # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
904 sub incr_server_setting {
905 my ($self, $key, $val) = @_;
906 $val = 1 unless defined $val;
907 return unless $val;
909 return 1 if $self->dbh->do("UPDATE server_settings ".
910 "SET value=value+? ".
911 "WHERE field=?", undef,
912 $val, $key) > 0;
913 $self->set_server_setting($key, $val);
916 sub server_setting {
917 my ($self, $key) = @_;
918 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
919 undef, $key);
922 sub server_settings {
923 my ($self) = @_;
924 my $ret = {};
925 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
926 $sth->execute;
927 while (my ($k, $v) = $sth->fetchrow_array) {
928 $ret->{$k} = $v;
930 return $ret;
933 # register a tempfile and return the fidid, which should be allocated
934 # using autoincrement/sequences if the passed in fid is undef. however,
935 # if fid is passed in, that value should be used and returned.
937 # return new/passed in fidid on success.
938 # throw 'dup' if fid already in use
939 # return 0/undef/die on failure
941 sub register_tempfile {
942 my $self = shift;
943 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
945 my $dbh = $self->dbh;
946 my $fid = $arg{fid};
948 my $explicit_fid_used = $fid ? 1 : 0;
950 # setup the new mapping. we store the devices that we picked for
951 # this file in here, knowing that they might not be used. create_close
952 # is responsible for actually mapping in file_on. NOTE: fid is being
953 # passed in, it's either some number they gave us, or it's going to be
954 # 0/undef which translates into NULL which means to automatically create
955 # one. that should be fine.
956 my $ins_tempfile = sub {
957 my $rv = eval {
958 # We must only pass the correct number of bind parameters
959 # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
960 # Postgres, where you are expected to leave it out or use DEFAULT
961 # Leaving it out seems sanest and least likely to cause problems
962 # with other databases.
963 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
964 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
965 my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
966 # Do not check for $explicit_fid_used, but rather $fid directly
967 # as this anonymous sub is called from the loop later
968 if($fid) {
969 unshift @keys, 'fid';
970 unshift @vars, '?';
971 unshift @vals, $fid;
973 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
974 $dbh->do($sql, undef, @vals);
976 if (!$rv) {
977 return undef if $self->was_duplicate_error;
978 die "Unexpected db error into tempfile: " . $dbh->errstr;
981 unless (defined $fid) {
982 # if they did not give us a fid, then we want to grab the one that was
983 # theoretically automatically generated
984 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
985 or die "No last_insert_id found";
987 return undef unless defined $fid && $fid > 0;
988 return 1;
991 unless ($ins_tempfile->()) {
992 throw("dup") if $explicit_fid_used;
993 die "tempfile insert failed";
996 my $fid_in_use = sub {
997 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
998 return $exists ? 1 : 0;
1001 # See notes in MogileFS::Config->check_database
1002 my $min_fidid = MogileFS::Config->config('min_fidid');
1004 # if the fid is in use, do something
1005 while ($fid_in_use->($fid) || $fid <= $min_fidid) {
1006 throw("dup") if $explicit_fid_used;
1008 # be careful of databases which reset their
1009 # auto-increment/sequences when the table is empty (InnoDB
1010 # did/does this, for instance). So check if it's in use, and
1011 # re-seed the table with the highest known fid from the file
1012 # table.
1014 # get the highest fid from the filetable and insert a dummy row
1015 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
1016 $ins_tempfile->(); # don't care about its result
1018 # then do a normal auto-increment
1019 $fid = undef;
1020 $ins_tempfile->() or die "register_tempfile failed after seeding";
1023 return $fid;
1026 # return hashref of row containing columns "fid, dmid, dkey, length,
1027 # classid, devcount" provided a $dmid and $key (dkey). or undef if no
1028 # row.
1029 sub file_row_from_dmid_key {
1030 my ($self, $dmid, $key) = @_;
1031 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1032 "FROM file WHERE dmid=? AND dkey=?",
1033 undef, $dmid, $key);
1036 # return hashref of row containing columns "fid, dmid, dkey, length,
1037 # classid, devcount" provided a $fidid or undef if no row.
1038 sub file_row_from_fidid {
1039 my ($self, $fidid) = @_;
1040 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1041 "FROM file WHERE fid=?",
1042 undef, $fidid);
1045 # return an arrayref of rows containing columns "fid, dmid, dkey, length,
1046 # classid, devcount" provided a pair of $fidid or undef if no rows.
1047 sub file_row_from_fidid_range {
1048 my ($self, $fromfid, $count) = @_;
1049 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1050 "FROM file WHERE fid > ? LIMIT ?");
1051 $sth->execute($fromfid,$count);
1052 return $sth->fetchall_arrayref({});
1055 # return array of devids that a fidid is on
1056 sub fid_devids {
1057 my ($self, $fidid) = @_;
1058 return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
1059 undef, $fidid) || [] };
1062 # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1063 sub fid_devids_multiple {
1064 my ($self, @fidids) = @_;
1065 my $in = join(",", map { $_+0 } @fidids);
1066 my $ret = {};
1067 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1068 $sth->execute;
1069 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1070 push @{$ret->{$fidid} ||= []}, $devid;
1072 return $ret;
1075 # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1076 sub tempfile_row_from_fid {
1077 my ($self, $fidid) = @_;
1078 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1079 "FROM tempfile WHERE fid=?",
1080 undef, $fidid);
1083 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1084 sub create_device {
1085 my ($self, $devid, $hostid, $status) = @_;
1086 my $rv = $self->conddup(sub {
1087 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1088 $devid, $hostid, $status);
1090 $self->condthrow;
1091 die "error making device $devid\n" unless $rv > 0;
1092 return 1;
1095 sub update_device {
1096 my ($self, $devid, $to_update) = @_;
1097 my @keys = sort keys %$to_update;
1098 return unless @keys;
1099 $self->conddup(sub {
1100 $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1101 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
1102 $devid);
1104 return 1;
1107 sub update_device_usage {
1108 my $self = shift;
1109 my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
1110 eval {
1111 $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
1112 " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid});
1114 $self->condthrow;
1117 # This is unimplemented at the moment as we must verify:
1118 # - no file_on rows exist
1119 # - nothing in file_to_queue is going to attempt to use it
1120 # - nothing in file_to_replicate is going to attempt to use it
1121 # - it's already been marked dead
1122 # - that all trackers are likely to know this :/
1123 # - ensure the devid can't be reused
1124 # IE; the user can't mark it dead then remove it all at once and cause their
1125 # cluster to implode.
1126 sub delete_device {
1127 die "Unimplemented; needs further testing";
1130 sub mark_fidid_unreachable {
1131 my ($self, $fidid) = @_;
1132 die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
1133 $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
1134 undef, $fidid);
1137 sub set_device_weight {
1138 my ($self, $devid, $weight) = @_;
1139 eval {
1140 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1142 $self->condthrow;
1145 sub set_device_state {
1146 my ($self, $devid, $state) = @_;
1147 eval {
1148 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1150 $self->condthrow;
1153 sub delete_class {
1154 my ($self, $dmid, $cid) = @_;
1155 throw("has_files") if $self->class_has_files($dmid, $cid);
1156 eval {
1157 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1159 $self->condthrow;
1162 sub delete_fidid {
1163 my ($self, $fidid) = @_;
1164 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
1165 $self->condthrow;
1166 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1167 $self->condthrow;
1168 $self->enqueue_for_delete2($fidid, 0);
1169 $self->condthrow;
1172 sub delete_tempfile_row {
1173 my ($self, $fidid) = @_;
1174 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
1175 $self->condthrow;
1176 return $rv;
1179 # Load the specified tempfile, then delete it. If we succeed, we were
1180 # here first; otherwise, someone else beat us here (and we return undef)
1181 sub delete_and_return_tempfile_row {
1182 my ($self, $fidid) = @_;
1183 my $rv = $self->tempfile_row_from_fid($fidid);
1184 my $rows_deleted = $self->delete_tempfile_row($fidid);
1185 return $rv if ($rows_deleted > 0);
1188 sub replace_into_file {
1189 my $self = shift;
1190 my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
1191 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1192 eval {
1193 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1194 "VALUES (?,?,?,?,?,0) ", undef,
1195 @arg{'fidid', 'dmid', 'key', 'length', 'classid'});
1197 $self->condthrow;
1200 # returns 1 on success, 0 on duplicate key error, dies on exception
1201 # TODO: need a test to hit the duplicate name error condition
1202 # TODO: switch to using "dup" exception here?
1203 sub rename_file {
1204 my ($self, $fidid, $to_key) = @_;
1205 my $dbh = $self->dbh;
1206 eval {
1207 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1208 undef, $to_key, $fidid);
1210 if ($@ || $dbh->err) {
1211 # first is MySQL's error code for duplicates
1212 if ($self->was_duplicate_error) {
1213 return 0;
1214 } else {
1215 die $@;
1218 $self->condthrow;
1219 return 1;
1222 sub get_domainid_by_name {
1223 my $self = shift;
1224 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1225 undef, $_[0]);
1226 return $dmid;
1229 # returns a hash of domains. Key is namespace, value is dmid.
1230 sub get_all_domains {
1231 my ($self) = @_;
1232 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1233 return map { ($_->[0], $_->[1]) } @{$domains || []};
1236 sub get_classid_by_name {
1237 my $self = shift;
1238 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1239 undef, $_[0], $_[1]);
1240 return $classid;
1243 # returns an array of hashrefs, one hashref per row in the 'class' table
1244 sub get_all_classes {
1245 my ($self) = @_;
1246 my (@ret, $row);
1248 my $repl_col = "";
1249 if ($self->cached_schema_version >= 10) {
1250 $repl_col = ", replpolicy";
1253 my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount $repl_col FROM class");
1254 $sth->execute;
1255 push @ret, $row while $row = $sth->fetchrow_hashref;
1256 return @ret;
1259 # add a record of fidid existing on devid
1260 # returns 1 on success, 0 on duplicate
1261 sub add_fidid_to_devid {
1262 my ($self, $fidid, $devid) = @_;
1263 croak("fidid not non-zero") unless $fidid;
1264 croak("devid not non-zero") unless $devid;
1266 # TODO: This should possibly be insert_ignore instead
1267 # As if we are adding an extra file_on entry, we do not want to replace the
1268 # exist one. Check REPLACE semantics.
1269 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1270 undef, $fidid, $devid);
1271 return 1 if $rv > 0;
1272 return 0;
1275 # remove a record of fidid existing on devid
1276 # returns 1 on success, 0 if not there anyway
1277 sub remove_fidid_from_devid {
1278 my ($self, $fidid, $devid) = @_;
1279 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1280 undef, $fidid, $devid); };
1281 $self->condthrow;
1282 return $rv;
1285 # Test if host exists.
1286 sub get_hostid_by_id {
1287 my $self = shift;
1288 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1289 undef, $_[0]);
1290 return $hostid;
1293 sub get_hostid_by_name {
1294 my $self = shift;
1295 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1296 undef, $_[0]);
1297 return $hostid;
1300 # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1301 sub get_all_hosts {
1302 my ($self) = @_;
1303 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1304 "hostip, http_port, http_get_port, altip, altmask FROM host");
1305 $sth->execute;
1306 my @ret;
1307 while (my $row = $sth->fetchrow_hashref) {
1308 push @ret, $row;
1310 return @ret;
1313 # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1314 sub get_all_devices {
1315 my ($self) = @_;
1316 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1317 "mb_used, mb_asof, status, weight FROM device");
1318 $self->condthrow;
1319 $sth->execute;
1320 my @return;
1321 while (my $row = $sth->fetchrow_hashref) {
1322 push @return, $row;
1324 return @return;
1327 # update the device count for a given fidid
1328 sub update_devcount {
1329 my ($self, $fidid) = @_;
1330 my $dbh = $self->dbh;
1331 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1332 undef, $fidid);
1334 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1335 $ct, $fidid); };
1336 $self->condthrow;
1338 return 1;
1341 # update the classid for a given fidid
1342 sub update_classid {
1343 my ($self, $fidid, $classid) = @_;
1344 my $dbh = $self->dbh;
1346 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1347 $classid, $fidid);
1349 $self->condthrow;
1350 return 1;
1353 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1354 sub enqueue_for_replication {
1355 my ($self, $fidid, $from_devid, $in) = @_;
1357 my $nexttry = 0;
1358 if ($in) {
1359 $nexttry = $self->unix_timestamp . " + " . int($in);
1362 $self->retry_on_deadlock(sub {
1363 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1364 "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1368 # enqueue a fidid for delete
1369 # note: if we get one more "independent" queue like this, the
1370 # code should be collapsable? I tried once and it looked too ugly, so we have
1371 # some redundancy.
1372 sub enqueue_for_delete2 {
1373 my ($self, $fidid, $in) = @_;
1375 $in = 0 unless $in;
1376 my $nexttry = $self->unix_timestamp . " + " . int($in);
1378 $self->retry_on_deadlock(sub {
1379 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1380 "VALUES (?,$nexttry)", undef, $fidid);
1384 # enqueue a fidid for work
1385 sub enqueue_for_todo {
1386 my ($self, $fidid, $type, $in) = @_;
1388 $in = 0 unless $in;
1389 my $nexttry = $self->unix_timestamp . " + " . int($in);
1391 $self->retry_on_deadlock(sub {
1392 if (ref($fidid)) {
1393 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1394 "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1395 $fidid->[0], $fidid->[1], $fidid->[2], $type);
1396 } else {
1397 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1398 "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1403 # return 1 on success. die otherwise.
1404 sub enqueue_many_for_todo {
1405 my ($self, $fidids, $type, $in) = @_;
1406 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1407 $self->enqueue_for_todo($_, $type, $in) foreach @$fidids;
1408 return 1;
1411 $in = 0 unless $in;
1412 my $nexttry = $self->unix_timestamp . " + " . int($in);
1414 # TODO: convert to prepared statement?
1415 $self->retry_on_deadlock(sub {
1416 if (ref($fidids->[0]) eq 'ARRAY') {
1417 my $sql = $self->ignore_replace .
1418 "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1419 join(', ', ('(?,?,?,?,?)') x scalar @$fidids);
1420 $self->dbh->do($sql, undef, map { @$_, $type, $nexttry } @$fidids);
1421 } else {
1422 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1423 nexttry) VALUES " .
1424 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @$fidids));
1427 $self->condthrow;
1430 # For file_to_queue queues that should be kept small, find the size.
1431 # This isn't fast, but for small queues won't be slow, and is usually only ran
1432 # from a single tracker.
1433 sub file_queue_length {
1434 my $self = shift;
1435 my $type = shift;
1437 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1438 "WHERE type = ?", undef, $type);
1441 # reschedule all deferred replication, return number rescheduled
1442 sub replicate_now {
1443 my ($self) = @_;
1445 $self->retry_on_deadlock(sub {
1446 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1447 " WHERE nexttry > " . $self->unix_timestamp);
1451 # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1452 sub get_fidids_by_device {
1453 my ($self, $devid, $limit) = @_;
1455 my $dbh = $self->dbh;
1456 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1457 undef, $devid);
1458 return $fidids;
1461 # finds a chunk of fids given a set of constraints:
1462 # devid, fidid, age (new or old), limit
1463 # Note that if this function is very slow on your large DB, you're likely
1464 # sorting by "newfiles" and are missing a new index.
1465 # returns an arrayref of fidids
1466 sub get_fidid_chunks_by_device {
1467 my ($self, %o) = @_;
1469 my $dbh = $self->dbh;
1470 my $devid = delete $o{devid};
1471 croak("must supply at least a devid") unless $devid;
1472 my $age = delete $o{age};
1473 my $fidid = delete $o{fidid};
1474 my $limit = delete $o{limit};
1475 croak("invalid options: " . join(', ', keys %o)) if %o;
1476 # If supplied a "previous" fidid, we're paging through.
1477 my $fidsort = '';
1478 my $order = '';
1479 $age ||= 'old';
1480 if ($age eq 'old') {
1481 $fidsort = 'AND fid > ?' if $fidid;
1482 $order = 'ASC';
1483 } elsif ($age eq 'new') {
1484 $fidsort = 'AND fid < ?' if $fidid;
1485 $order = 'DESC';
1486 } else {
1487 croak("invalid age argument: " . $age);
1489 $limit ||= 100;
1490 my @extra = ();
1491 push @extra, $fidid if $fidid;
1493 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1494 $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1495 return $fidids;
1498 # takes two arguments, fidid to be above, and optional limit (default
1499 # 1,000). returns up to that that many fidids above the provided
1500 # fidid. returns array of MogileFS::FID objects, sorted by fid ids.
1501 sub get_fids_above_id {
1502 my ($self, $fidid, $limit) = @_;
1503 $limit ||= 1000;
1504 $limit = int($limit);
1506 my @ret;
1507 my $dbh = $self->dbh;
1508 my $sth = $dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1509 "FROM file ".
1510 "WHERE fid > ? ".
1511 "ORDER BY fid LIMIT $limit");
1512 $sth->execute($fidid);
1513 while (my $row = $sth->fetchrow_hashref) {
1514 push @ret, MogileFS::FID->new_from_db_row($row);
1516 return @ret;
1519 # Same as above, but returns unblessed hashref.
1520 sub get_fidids_above_id {
1521 my ($self, $fidid, $limit) = @_;
1522 $limit ||= 1000;
1523 $limit = int($limit);
1525 my $dbh = $self->dbh;
1526 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file WHERE fid > ?
1527 ORDER BY fid LIMIT $limit}, undef, $fidid);
1528 return $fidids;
1531 # creates a new domain, given a domain namespace string. return the dmid on success,
1532 # throw 'dup' on duplicate name.
1533 # override if you want a less racy version.
1534 sub create_domain {
1535 my ($self, $name) = @_;
1536 my $dbh = $self->dbh;
1538 # get the max domain id
1539 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1540 my $rv = eval {
1541 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1542 undef, $maxid + 1, $name);
1544 if ($self->was_duplicate_error) {
1545 throw("dup");
1547 return $maxid+1 if $rv;
1548 die "failed to make domain"; # FIXME: the above is racy.
1551 sub update_host {
1552 my ($self, $hid, $to_update) = @_;
1553 my @keys = sort keys %$to_update;
1554 return unless @keys;
1555 $self->conddup(sub {
1556 $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1557 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
1558 $hid);
1560 return 1;
1563 sub update_host_property {
1564 my ($self, $hostid, $col, $val) = @_;
1565 $self->conddup(sub {
1566 $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
1568 return 1;
1571 # return ne hostid, or throw 'dup' on error.
1572 # NOTE: you need to put them into the initial 'down' state.
1573 sub create_host {
1574 my ($self, $hostname, $ip) = @_;
1575 my $dbh = $self->dbh;
1576 # racy! lazy. no, better: portable! how often does this happen? :)
1577 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1578 my $rv = $self->conddup(sub {
1579 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1580 "VALUES (?, ?, ?, 'down')",
1581 undef, $hid, $hostname, $ip);
1583 return $hid if $rv;
1584 die "db failure";
1587 # return array of row hashrefs containing columns: (fid, fromdevid,
1588 # failcount, flags, nexttry)
1589 sub files_to_replicate {
1590 my ($self, $limit) = @_;
1591 my $ut = $self->unix_timestamp;
1592 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1593 SELECT fid, fromdevid, failcount, flags, nexttry
1594 FROM file_to_replicate
1595 WHERE nexttry <= $ut
1596 ORDER BY nexttry
1597 LIMIT $limit
1598 }, "fid") or return ();
1599 return values %$to_repl_map;
1602 # "new" style queue consumption code.
1603 # from within a transaction, fetch a limit of fids,
1604 # then update each fid's nexttry to be off in the future,
1605 # giving local workers some time to dequeue the items.
1606 # Note:
1607 # DBI (even with RaiseError) returns weird errors on
1608 # deadlocks from selectall_hashref. So we can't do that.
1609 # we also used to retry on deadlock within the routine,
1610 # but instead lets return undef and let job_master retry.
1611 sub grab_queue_chunk {
1612 my $self = shift;
1613 my $queue = shift;
1614 my $limit = shift;
1615 my $extfields = shift;
1617 my $dbh = $self->dbh;
1618 my $tries = 3;
1619 my $work;
1621 return 0 unless $self->lock_queue($queue);
1623 my $extwhere = shift || '';
1624 my $fields = 'fid, nexttry, failcount';
1625 $fields .= ', ' . $extfields if $extfields;
1626 eval {
1627 $dbh->begin_work;
1628 my $ut = $self->unix_timestamp;
1629 my $query = qq{
1630 SELECT $fields
1631 FROM $queue
1632 WHERE nexttry <= $ut
1633 $extwhere
1634 ORDER BY nexttry
1635 LIMIT $limit
1637 $query .= "FOR UPDATE\n" if $self->can_for_update;
1638 my $sth = $dbh->prepare($query);
1639 $sth->execute;
1640 $work = $sth->fetchall_hashref('fid');
1641 # Nothing to work on.
1642 # Now claim the fids for a while.
1643 # TODO: Should be configurable... but not necessary.
1644 my $fidlist = join(',', keys %$work);
1645 unless ($fidlist) { $dbh->commit; return; }
1646 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1647 $dbh->commit;
1649 $self->unlock_queue($queue);
1650 if ($self->was_deadlock_error) {
1651 eval { $dbh->rollback };
1652 return ();
1654 $self->condthrow;
1656 return defined $work ? values %$work : ();
1659 sub grab_files_to_replicate {
1660 my ($self, $limit) = @_;
1661 return $self->grab_queue_chunk('file_to_replicate', $limit,
1662 'fromdevid, flags');
1665 sub grab_files_to_delete2 {
1666 my ($self, $limit) = @_;
1667 return $self->grab_queue_chunk('file_to_delete2', $limit);
1670 # $extwhere is ugly... but should be fine.
1671 sub grab_files_to_queued {
1672 my ($self, $type, $what, $limit) = @_;
1673 $what ||= 'type, flags';
1674 return $self->grab_queue_chunk('file_to_queue', $limit,
1675 $what, 'AND type = ' . $type);
1678 # although it's safe to have multiple tracker hosts and/or processes
1679 # replicating the same file, around, it's inefficient CPU/time-wise,
1680 # and it's also possible they pick different places and waste disk.
1681 # so the replicator asks the store interface when it's about to start
1682 # and when it's done replicating a fidid, so you can do something smart
1683 # and tell it not to.
1684 sub should_begin_replicating_fidid {
1685 my ($self, $fidid) = @_;
1686 warn("Inefficient implementation of should_begin_replicating_fidid() in $self!\n");
1690 # called when replicator is done replicating a fid, so you can cleanup
1691 # whatever you did in 'should_begin_replicating_fidid' above.
1693 # NOTE: there's a theoretical race condition in the rebalance code,
1694 # where (without locking as provided by
1695 # should_begin_replicating_fidid/note_done_replicating), all copies of
1696 # a file can be deleted by independent replicators doing rebalancing
1697 # in different ways. so you'll probably want to implement some
1698 # locking in this pair of functions.
1699 sub note_done_replicating {
1700 my ($self, $fidid) = @_;
1703 sub find_fid_from_file_to_replicate {
1704 my ($self, $fidid) = @_;
1705 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1706 undef, $fidid);
1709 sub find_fid_from_file_to_delete2 {
1710 my ($self, $fidid) = @_;
1711 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1712 undef, $fidid);
1715 sub find_fid_from_file_to_queue {
1716 my ($self, $fidid, $type) = @_;
1717 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1718 undef, $fidid, $type);
1721 sub delete_fid_from_file_to_replicate {
1722 my ($self, $fidid) = @_;
1723 $self->retry_on_deadlock(sub {
1724 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1728 sub delete_fid_from_file_to_queue {
1729 my ($self, $fidid, $type) = @_;
1730 $self->retry_on_deadlock(sub {
1731 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1732 undef, $fidid, $type);
1736 sub delete_fid_from_file_to_delete2 {
1737 my ($self, $fidid) = @_;
1738 $self->retry_on_deadlock(sub {
1739 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1743 sub reschedule_file_to_replicate_absolute {
1744 my ($self, $fid, $abstime) = @_;
1745 $self->retry_on_deadlock(sub {
1746 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1747 undef, $abstime, $fid);
1751 sub reschedule_file_to_replicate_relative {
1752 my ($self, $fid, $in_n_secs) = @_;
1753 $self->retry_on_deadlock(sub {
1754 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1755 "failcount = failcount + 1 WHERE fid = ?",
1756 undef, $in_n_secs, $fid);
1760 sub reschedule_file_to_delete2_absolute {
1761 my ($self, $fid, $abstime) = @_;
1762 $self->retry_on_deadlock(sub {
1763 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1764 undef, $abstime, $fid);
1768 sub reschedule_file_to_delete2_relative {
1769 my ($self, $fid, $in_n_secs) = @_;
1770 $self->retry_on_deadlock(sub {
1771 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1772 "failcount = failcount + 1 WHERE fid = ?",
1773 undef, $in_n_secs, $fid);
1777 # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1778 # table.
1779 sub get_keys_like {
1780 my ($self, $dmid, $prefix, $after, $limit) = @_;
1781 # fix the input... prefix always ends with a % so that it works
1782 # in a LIKE call, and after is either blank or something
1783 $prefix = '' unless defined $prefix;
1784 $prefix .= '%';
1785 $after = '' unless defined $after;
1787 # now select out our keys
1788 return $self->dbh->selectcol_arrayref
1789 ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
1790 "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
1793 # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1794 # that were created $secs_ago seconds ago or older.
1795 sub old_tempfiles {
1796 my ($self, $secs_old) = @_;
1797 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1798 "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1801 # given an array of MogileFS::DevFID objects, mass-insert them all
1802 # into file_on (ignoring if they're already present)
1803 sub mass_insert_file_on {
1804 my ($self, @devfids) = @_;
1805 return 1 unless @devfids;
1807 if (@devfids > 1 && ! $self->can_insert_multi) {
1808 $self->mass_insert_file_on($_) foreach @devfids;
1809 return 1;
1812 my (@qmarks, @binds);
1813 foreach my $df (@devfids) {
1814 my ($fidid, $devid) = ($df->fidid, $df->devid);
1815 Carp::croak("got a false fidid") unless $fidid;
1816 Carp::croak("got a false devid") unless $devid;
1817 push @binds, $fidid, $devid;
1818 push @qmarks, "(?,?)";
1821 # TODO: This should possibly be insert_ignore instead
1822 # As if we are adding an extra file_on entry, we do not want to replace the
1823 # exist one. Check REPLACE semantics.
1824 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1825 return 1;
1828 sub set_schema_vesion {
1829 my ($self, $ver) = @_;
1830 $self->set_server_setting("schema_version", int($ver));
1833 # returns array of fidids to try and delete again
1834 sub fids_to_delete_again {
1835 my $self = shift;
1836 my $ut = $self->unix_timestamp;
1837 return @{ $self->dbh->selectcol_arrayref(qq{
1838 SELECT fid
1839 FROM file_to_delete_later
1840 WHERE delafter < $ut
1841 LIMIT 500
1842 }) || [] };
1845 # return 1 on success. die otherwise.
1846 sub enqueue_fids_to_delete {
1847 my ($self, @fidids) = @_;
1848 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1849 # when the first row causes the duplicate error, and the remaining rows are
1850 # not processed.
1851 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1852 $self->enqueue_fids_to_delete($_) foreach @fidids;
1853 return 1;
1855 # TODO: convert to prepared statement?
1856 $self->retry_on_deadlock(sub {
1857 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1858 join(",", map { "(" . int($_) . ")" } @fidids));
1860 $self->condthrow;
1863 sub enqueue_fids_to_delete2 {
1864 my ($self, @fidids) = @_;
1865 # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1866 # when the first row causes the duplicate error, and the remaining rows are
1867 # not processed.
1868 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1869 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1870 return 1;
1873 my $nexttry = $self->unix_timestamp;
1875 # TODO: convert to prepared statement?
1876 $self->retry_on_deadlock(sub {
1877 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
1878 nexttry) VALUES " .
1879 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
1881 $self->condthrow;
1884 # clears everything from the fsck_log table
1885 # return 1 on success. die otherwise.
1886 sub clear_fsck_log {
1887 my $self = shift;
1888 $self->dbh->do("DELETE FROM fsck_log");
1889 return 1;
1892 # FIXME: Fsck log entries are processed a little out of order.
1893 # Once a fsck has completed, the log should be re-summarized.
1894 sub fsck_log_summarize {
1895 my $self = shift;
1897 my $lockname = 'mgfs:fscksum';
1898 my $lock = eval { $self->get_lock($lockname, 10) };
1899 return 0 if defined $lock && $lock == 0;
1901 my $logid = $self->max_fsck_logid;
1903 # sum-up evcode counts every so often, to make fsck_status faster,
1904 # avoiding a potentially-huge GROUP BY in the future..
1905 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
1906 # both inclusive:
1907 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
1908 $min_logid++;
1909 my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
1910 while (my ($evcode, $ct) = each %$cts) {
1911 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
1913 $self->set_server_setting("fsck_logid_processed", $logid);
1915 $self->release_lock($lockname) if $lock;
1918 sub fsck_log {
1919 my ($self, %opts) = @_;
1920 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
1921 "VALUES (" . $self->unix_timestamp . ",?,?,?)",
1922 undef,
1923 delete $opts{fid},
1924 delete $opts{code},
1925 delete $opts{devid});
1926 croak("Unknown opts") if %opts;
1927 $self->condthrow;
1929 return 1;
1932 sub get_db_unixtime {
1933 my $self = shift;
1934 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
1937 sub max_fidid {
1938 my $self = shift;
1939 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
1942 sub max_fsck_logid {
1943 my $self = shift;
1944 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
1947 # returns array of $row hashrefs, from fsck_log table
1948 sub fsck_log_rows {
1949 my ($self, $after_logid, $limit) = @_;
1950 $limit = int($limit || 100);
1951 $after_logid = int($after_logid || 0);
1953 my @rows;
1954 my $sth = $self->dbh->prepare(qq{
1955 SELECT logid, utime, fid, evcode, devid
1956 FROM fsck_log
1957 WHERE logid > ?
1958 ORDER BY logid
1959 LIMIT $limit
1961 $sth->execute($after_logid);
1962 my $row;
1963 push @rows, $row while $row = $sth->fetchrow_hashref;
1964 return @rows;
1967 sub fsck_evcode_counts {
1968 my ($self, %opts) = @_;
1969 my $timegte = delete $opts{time_gte};
1970 my $logr = delete $opts{logid_range};
1971 die if %opts;
1973 my $ret = {};
1974 my $sth;
1975 if ($timegte) {
1976 $sth = $self->dbh->prepare(qq{
1977 SELECT evcode, COUNT(*) FROM fsck_log
1978 WHERE utime >= ?
1979 GROUP BY evcode
1981 $sth->execute($timegte||0);
1983 if ($logr) {
1984 $sth = $self->dbh->prepare(qq{
1985 SELECT evcode, COUNT(*) FROM fsck_log
1986 WHERE logid >= ? AND logid <= ?
1987 GROUP BY evcode
1989 $sth->execute($logr->[0], $logr->[1]);
1991 while (my ($ev, $ct) = $sth->fetchrow_array) {
1992 $ret->{$ev} = $ct;
1994 return $ret;
1997 # run before daemonizing. you can die from here if you see something's amiss. or emit
1998 # warnings.
1999 sub pre_daemonize_checks { }
2002 # attempt to grab a lock of lockname, and timeout after timeout seconds.
2003 # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
2004 sub get_lock {
2005 my ($self, $lockname, $timeout) = @_;
2006 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
2007 die "get_lock not implemented for $self";
2010 # attempt to release a lock of lockname.
2011 # returns 1 on success and 0 if no lock we have has that name.
2012 sub release_lock {
2013 my ($self, $lockname) = @_;
2014 die "release_lock not implemented for $self";
2017 # MySQL has an issue where you either get excessive deadlocks, or INSERT's
2018 # hang forever around some transactions. Use ghetto locking to cope.
2019 sub lock_queue { 1 }
2020 sub unlock_queue { 1 }
2022 # returns up to $limit @fidids which are on provided $devid
2023 sub random_fids_on_device {
2024 my ($self, $devid, $limit) = @_;
2025 $limit = int($limit) || 100;
2027 my $dbh = $self->dbh;
2029 # FIXME: this blows. not random. and good chances these will
2030 # eventually get to point where they're un-rebalance-able, and we
2031 # never move on past the first 5000
2032 my @some_fids = List::Util::shuffle(@{
2033 $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid=? LIMIT 5000",
2034 undef, $devid) || []
2037 @some_fids = @some_fids[0..$limit-1] if $limit < @some_fids;
2038 return @some_fids;
2043 __END__
2045 =head1 NAME
2047 MogileFS::Store - data storage provider. base class.
2049 =head1 ABOUT
2051 MogileFS aims to be database-independent (though currently as of late
2052 2006 only works with MySQL). In the future, the server will create a
2053 singleton instance of type "MogileFS::Store", like
2054 L<MogileFS::Store::MySQL>, and all database interaction will be
2055 through it.
2057 =head1 SEE ALSO
2059 L<MogileFS::Store::MySQL>