1 package MogileFS
::Store
::Postgres
;
2 # vim: ts=4 sw=4 et ft=perl:
4 use Digest
::MD5
qw(md5); # Used for lockid
8 use MogileFS
::Util
qw(throw debug error);
11 use base
'MogileFS::Store';
13 # --------------------------------------------------------------------------
14 # Package methods we override
15 # --------------------------------------------------------------------------
18 my ($class, $dbname, $host, $port) = @_;
19 return "DBI:Pg:dbname=$dbname;host=$host" . ($port ?
";port=$port" : "");
23 my ($class, $dbname, $host, $port) = @_;
24 return $class->dsn_of_dbhost('postgres', $host, $port);
27 # --------------------------------------------------------------------------
28 # Store-related things we override
29 # --------------------------------------------------------------------------
31 sub want_raise_errors
{ 1 }
33 # given a root DBI connection, create the named database. succeed
34 # if it it's made, or already exists. die otherwise.
35 sub create_db_if_not_exists
{
36 my ($pkg, $rdbh, $dbname) = @_;
37 if(not $rdbh->do("CREATE DATABASE $dbname")) {
38 die "Failed to create database '$dbname': " . $rdbh->errstr . "\n" if ($rdbh->errstr !~ /already exists/);
42 sub grant_privileges
{
43 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
45 $rdbh->do("CREATE ROLE $user LOGIN PASSWORD ?",
48 die "Failed to create user '$user': ". $rdbh->errstr . "\n"
49 if $rdbh->err && $rdbh->state != '42710';
50 # Owning the database is postgres is important
51 $rdbh->do("ALTER DATABASE $dbname OWNER TO $user")
52 or die "Failed to grant privileges " . $rdbh->errstr . "\n";
56 sub can_insertignore
{ 0 }
57 sub can_insert_multi
{ 0 }
58 sub unix_timestamp
{ "EXTRACT(epoch FROM NOW())::int4" }
63 my $database_version = $self->dbh->get_info(18); # SQL_DBMS_VER
64 # We need >=pg-8.2 because we use SAVEPOINT and ROLLBACK TO.
65 die "Postgres is too old! Must use >=postgresql-8.2!" if($database_version =~ /\A0[0-7]\.|08\.0[01]/);
66 $self->{lock_depth
} = 0;
69 sub post_dbi_connect
{
71 $self->SUPER::post_dbi_connect
;
72 $self->{lock_depth
} = 0;
75 sub can_do_slaves
{ 0 }
77 # TODO: Implement later
81 sub was_duplicate_error
{
84 return 0 unless $dbh->err;
85 return 1 if $dbh->state == '23505' || $dbh->errstr =~ /duplicate/i;
89 my ($self, $table) = @_;
91 my $sth = $self->dbh->table_info(undef, undef, $table, "table");
92 my $rec = $sth->fetchrow_hashref;
99 $self->add_extra_tables('lock');
100 return $self->SUPER::setup_database
;
103 sub filter_create_sql
{
104 my ($self, $sql) = @_;
105 $sql =~ s/\bUNSIGNED\b//g;
106 $sql =~ s/\b(?:TINY|MEDIUM)INT\b/SMALLINT/g;
107 $sql =~ s/\bINT\s+NOT\s+NULL\s+AUTO_INCREMENT\b/SERIAL/g;
110 my ($table) = $sql =~ /create\s+table\s+(\S+)/i;
111 die "didn't find table" unless $table;
112 if ($self->can("INDEXES_$table")) {
113 $sql =~ s!,\s+INDEX\s*\(.+?\)!!mg;
124 dmid SMALLINT NOT NULL,
125 dkey VARCHAR(255), -- domain-defined
128 length INT, -- 2TiB limit
131 classid SMALLINT NOT NULL,
132 devcount SMALLINT NOT NULL
137 "CREATE INDEX file_devcount ON file (dmid,classid,devcount)"
140 sub INDEXES_unreachable_fids
{
141 "CREATE INDEX unreachable_fids_lastupdate ON unreachable_fids (lastupdate)"
144 sub INDEXES_file_on
{
145 "CREATE INDEX file_on_devid ON file_on (devid)"
150 hostid SMALLINT NOT NULL,
151 PRIMARY KEY (hostid),
155 CHECK (status IN ('alive','dead','down')),
157 http_port INT DEFAULT 7500,
158 CHECK (http_port >= 0),
159 CHECK (http_port < 65536),
162 CHECK (http_get_port >= 0),
163 CHECK (http_get_port < 65536),
165 hostname VARCHAR(40),
176 "CREATE TABLE device (
177 devid SMALLINT NOT NULL,
181 hostid SMALLINT NOT NULL,
184 CHECK (status IN ('alive','dead','down','readonly','drain')),
185 weight INT DEFAULT 100,
188 CHECK (mb_total >= 0),
190 CHECK (mb_used >= 0),
197 "CREATE INDEX device_status ON device (status)"
200 sub INDEXES_file_to_replicate
{
201 "CREATE INDEX file_to_replicate_nexttry ON file_to_replicate (nexttry)"
204 sub INDEXES_file_to_delete_later
{
205 "CREATE INDEX file_to_delete_later_delafter ON file_to_delete_later (delafter)"
208 sub INDEXES_fsck_log
{
209 "CREATE INDEX fsck_log_utime ON fsck_log (utime)"
212 sub TABLE_file_to_queue
{
213 "CREATE TABLE file_to_queue (
214 fid INT UNSIGNED NOT NULL PRIMARY KEY,
216 type TINYINT UNSIGNED NOT NULL,
217 nexttry INT UNSIGNED NOT NULL,
218 failcount TINYINT UNSIGNED NOT NULL default '0',
219 flags SMALLINT UNSIGNED NOT NULL default '0',
223 sub INDEXES_file_to_queue
{
224 "CREATE INDEX type_nexttry ON file_to_queue (type,nexttry)"
231 PRIMARY KEY (lockid),
234 hostname VARCHAR(255) NOT NULL,
239 acquiredat INT NOT NULL,
240 CHECK (acquiredat >= 0)
244 sub upgrade_add_host_getport
{
246 # see if they have the get port, else update it
247 unless ($self->column_type("host", "http_get_port")) {
248 $self->dowell("ALTER TABLE host ADD COLUMN http_get_port INT CHECK(http_get_port >= 0)");
252 sub upgrade_add_host_altip
{
254 unless ($self->column_type("host", "altip")) {
255 $self->dowell("ALTER TABLE host ADD COLUMN altip VARCHAR(15)");
256 $self->dowell("ALTER TABLE host ADD COLUMN altmask VARCHAR(18)");
257 $self->dowell("ALTER TABLE host ADD UNIQUE altip (altip)");
261 sub upgrade_add_device_asof
{
263 unless ($self->column_type("device", "mb_asof")) {
264 $self->dowell("ALTER TABLE device ADD COLUMN mb_asof INT CHECK(mb_asof >= 0)");
268 sub upgrade_add_device_weight
{
270 unless ($self->column_type("device", "weight")) {
271 $self->dowell("ALTER TABLE device ADD COLUMN weight INT DEFAULT 100");
275 sub upgrade_add_device_readonly
{
277 unless ($self->column_constraint("device", "status") =~ /readonly/) {
278 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly'))");
282 sub upgrade_add_device_drain
{
284 unless ($self->column_constraint("device", "status") =~ /drain/) {
285 $self->dowell("ALTER TABLE device MODIFY COLUMN status VARCHAR(8) CHECK(status IN ('alive', 'dead', 'down', 'readonly','drain'))");
289 # return 1 on success. die otherwise.
290 sub enqueue_fids_to_delete
{
291 my ($self, @fidids) = @_;
292 my $sql = "INSERT INTO file_to_delete (fid) VALUES (?)";
293 my $savepoint_name = "savepoint_enqueue_fids_to_delete";
295 $self->dbh->begin_work;
296 foreach my $fidid (@fidids) {
297 $self->dbh->do('SAVEPOINT '.$savepoint_name);
300 $self->dbh->do($sql, undef, $fidid);
302 if ($@
|| $self->dbh->err) {
303 if ($self->was_duplicate_error) {
304 $self->dbh->do('ROLLBACK TO '.$savepoint_name);
313 # --------------------------------------------------------------------------
314 # Functions specific to Store::Postgres subclass. Not in parent.
315 # --------------------------------------------------------------------------
317 sub insert_or_ignore
{
319 my %arg = $self->_valid_params([qw(insert insert_vals)], @_);
320 return $self->insert_or_update(
321 insert
=> $arg{insert
},
322 insert_vals
=> $arg{insert_vals
},
324 update_vals
=> 'IGNORE',
328 sub insert_or_update
{
330 my %arg = $self->_valid_params([qw(insert update insert_vals update_vals)], @_);
331 my $dbh = $self->dbh;
332 my $savepoint_name = $arg{insert
};
333 $savepoint_name =~ s/^INSERT INTO ([^\s]+).*$/$1/g;
336 $dbh->do('SAVEPOINT '.$savepoint_name);
338 $dbh->do($arg{insert
}, undef, @
{ $arg{insert_vals
} });
340 if ($@
|| $dbh->err) {
341 if ($self->was_duplicate_error) {
342 $dbh->do('ROLLBACK TO '.$savepoint_name);
343 if($arg{update
} ne "IGNORE") {
344 $dbh->do($arg{update
}, undef, @
{ $arg{update_vals
} });
355 my ($self, $table, $col) = @_;
356 my $sth = $self->dbh->prepare("SELECT column_name,data_type FROM information_schema.columns WHERE table_name=? AND column_name=?");
357 $sth->execute($table,$col);
358 while (my $rec = $sth->fetchrow_hashref) {
359 if ($rec->{column_name
} eq $col) {
361 return $rec->{data_type
};
367 sub column_constraint
{
368 my ($self, $table, $col) = @_;
369 my $sth = $self->dbh->prepare("SELECT column_name,information_schema.check_constraints.check_clause FROM information_schema.constraint_column_usage JOIN information_schema.check_constraints USING(constraint_catalog,constraint_schema,constraint_name) WHERE table_name=? AND column_name=?");
370 $sth->execute($table,$col);
371 while (my $rec = $sth->fetchrow_hashref) {
372 if ($rec->{column_name
} eq $col) {
374 return $rec->{check_clause
};
380 # --------------------------------------------------------------------------
381 # Test suite things we override
382 # --------------------------------------------------------------------------
385 my $dbname = "tmp_mogiletest";
388 system("$FindBin::Bin/../mogdbsetup", "--yes", "--dbname=$dbname", "--type=Postgres", "--dbrootuser=postgres")
389 and die "Failed to run mogdbsetup ($FindBin::Bin/../mogdbsetup).";
391 return MogileFS
::Store
->new_from_dsn_user_pass("dbi:Pg:dbname=$dbname",
398 return $rootdbh ||= DBI
->connect("DBI:Pg:dbname=postgres", "postgres", "", { RaiseError
=> 1 })
399 or die "Couldn't connect to local PostgreSQL database as postgres";
404 my $root_dbh = _root_dbh
();
406 $root_dbh->do("DROP DATABASE $dbname");
411 # --------------------------------------------------------------------------
412 # Data-access things we override
413 # --------------------------------------------------------------------------
415 # return new classid on success (non-zero integer), die on failure
416 # throw 'dup' on duplicate name
417 # TODO: add locks around entire table
419 my ($self, $dmid, $classname) = @_;
420 my $dbh = $self->dbh;
422 # get the max class id in this domain
423 my $maxid = $dbh->selectrow_array
424 ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
426 # now insert the new class
428 $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
429 undef, $dmid, $maxid + 1, $classname, 2);
431 if ($@
|| $dbh->err) {
432 # first is error code for duplicates
433 if ($self->was_duplicate_error) {
437 return $maxid + 1 if $rv;
442 # returns 1 on success, 0 on duplicate key error, dies on exception
443 # TODO: need a test to hit the duplicate name error condition
445 my ($self, $fidid, $to_key) = @_;
446 my $dbh = $self->dbh;
448 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
449 undef, $to_key, $fidid);
451 if ($@
|| $dbh->err) {
452 # first is error code for duplicates
453 if ($self->was_duplicate_error) {
463 # add a record of fidid existing on devid
464 # returns 1 on success, 0 on duplicate
465 sub add_fidid_to_devid
{
466 my ($self, $fidid, $devid) = @_;
467 my $dbh = $self->dbh;
469 $dbh->do("INSERT INTO file_on (fid, devid) VALUES (?, ?)", undef, $fidid, $devid);
472 return 1 if !$@
&& !$dbh->err;
476 # update the device count for a given fidid
477 sub update_devcount_atomic
{
478 my ($self, $fidid) = @_;
481 $self->dbh->begin_work;
482 $rv = $self->dbh->do("SELECT devcount FROM file WHERE fid=? FOR UPDATE", undef, $fidid);
485 $self->dbh->rollback;
488 $rv = $self->dbh->do("UPDATE file SET devcount=(SELECT COUNT(devid) FROM file_on WHERE fid=?) WHERE fid=?", undef, $fidid, $fidid);
495 sub should_begin_replicating_fidid
{
496 my ($self, $fidid) = @_;
497 my $lockname = "mgfs:fid:$fidid:replicate";
498 return 1 if $self->get_lock($lockname, 1);
502 sub note_done_replicating
{
503 my ($self, $fidid) = @_;
504 my $lockname = "mgfs:fid:$fidid:replicate";
505 $self->release_lock($lockname);
508 # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
509 sub enqueue_for_replication
{
510 my ($self, $fidid, $from_devid, $in) = @_;
511 my $dbh = $self->dbh;
515 $nexttry = $self->unix_timestamp." + ${in}::int";
519 $dbh->do("INSERT INTO file_to_replicate (fid, fromdevid, nexttry) VALUES (?, ?, $nexttry)",
520 undef, $fidid, $from_devid);
524 # reschedule all deferred replication, return number rescheduled
527 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." WHERE nexttry > ".$self->unix_timestamp);
530 sub reschedule_file_to_replicate_relative
{
531 my ($self, $fid, $in_n_secs) = @_;
532 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ".$self->unix_timestamp." + ?, failcount = failcount + 1 WHERE fid = ?",
533 undef, $in_n_secs, $fid);
536 # creates a new domain, given a domain namespace string. return the dmid on success,
537 # throw 'dup' on duplicate name.
539 my ($self, $name) = @_;
540 my $dbh = $self->dbh;
542 # get the max domain id
543 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
545 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
546 undef, $maxid + 1, $name);
548 if ($self->was_duplicate_error) {
551 return $maxid+1 if $rv;
552 die "failed to make domain"; # FIXME: the above is racy.
555 sub set_server_setting
{
556 my ($self, $key, $val) = @_;
557 my $dbh = $self->dbh;
560 $self->insert_or_update(
561 insert
=> "INSERT INTO server_settings (field, value) VALUES (?, ?)",
562 insert_vals
=> [ $key, $val ],
563 update
=> "UPDATE server_settings SET value = ? WHERE field = ?",
564 update_vals
=> [ $val, $key ],
567 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
570 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
574 # This implementation is race-safe
575 sub incr_server_setting
{
576 my ($self, $key, $val) = @_;
577 $val = 1 unless defined $val;
580 $self->dbh->begin_work;
581 my $value = $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=? FOR UPDATE",undef,$key);
583 if($value =~ /^\d+$/) {
586 warning
("Wanted to incr_server_setting by $val on field=$key but old value was $value. Setting instead.");
589 my $rv = $self->dbh->do("UPDATE server_settings ".
591 "WHERE field=?", undef,
596 $self->dbh->rollback; # Release the row-lock
597 $self->set_server_setting($key, $val);
600 # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
602 my ($self, $devid, $hostid, $status) = @_;
603 my $rv = $self->conddup(sub {
604 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?, ?, ?)", undef,
605 $devid, $hostid, $status);
608 die "error making device $devid\n" unless $rv > 0;
612 sub mark_fidid_unreachable
{
613 my ($self, $fidid) = @_;
614 my $dbh = $self->dbh;
617 $self->insert_or_update(
618 insert
=> "INSERT INTO unreachable_fids (fid, lastupdate) VALUES (?, ".$self->unix_timestamp.")",
619 insert_vals
=> [ $fidid ],
620 update
=> "UPDATE unreachable_fids SET lastupdate = ".$self->unix_timestamp." WHERE field = ?",
621 update_vals
=> [ $fidid ],
627 my ($self, $fidid) = @_;
628 $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
630 $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
632 $self->insert_or_ignore(
633 insert
=> "INSERT INTO file_to_delete (fid) VALUES (?)",
634 insert_vals
=> [ $fidid ],
639 sub replace_into_file
{
641 my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
642 $self->insert_or_update(
643 insert
=> "INSERT INTO file (fid, dmid, dkey, length, classid, devcount) VALUES (?, ?, ?, ?, ?, 0)",
644 insert_vals
=> [ @arg{'fidid', 'dmid', 'key', 'length', 'classid'} ],
645 update
=> "UPDATE file SET dmid=?, dkey=?, length=?, classid=?, devcount=0 WHERE fid=?",
646 update_vals
=> [ @arg{'dmid', 'key', 'length', 'classid', 'fidid'} ],
651 # given an array of MogileFS::DevFID objects, mass-insert them all
652 # into file_on (ignoring if they're already present)
653 sub mass_insert_file_on
{
654 my ($self, @devfids) = @_;
655 my @qmarks = map { "(?,?)" } @devfids;
656 my @binds = map { $_->fidid, $_->devid } @devfids;
658 my $sth = $self->dbh->prepare("INSERT INTO file_on (fid, devid) VALUES (?, ?)");
661 $sth->execute($_->fidid, $_->devid);
663 $self->condthrow unless $self->was_duplicate_error;
669 croak
("Called with empty lockname! $lockname") unless (defined $lockname && length($lockname) > 0);
670 my $num = unpack 'N',md5
($lockname);
671 return ($num & 0x7fffffff);
674 # attempt to grab a lock of lockname, and timeout after timeout seconds.
675 # the lock should be unique in the space of (lockid), as well the space of
677 # returns 1 on success and 0 on timeout
679 my ($self, $lockname, $timeout) = @_;
680 my $lockid = lockid
($lockname);
681 die "Lock recursion detected (grabbing $lockname ($lockid), had $self->{last_lock} (".lockid
($self->{last_lock
})."). Bailing out." if $self->{lock_depth
};
683 debug
("$$ Locking $lockname ($lockid)\n") if $Mgd::DEBUG
>= 5;
686 while($timeout > 0 and not defined($lock)) {
687 $lock = eval { $self->dbh->do('INSERT INTO lock (lockid,hostname,pid,acquiredat) VALUES (?, ?, ?, '.$self->unix_timestamp().')', undef, $lockid, hostname
, $$) };
688 if($self->was_duplicate_error) {
694 #$lock = $self->dbh->selectrow_array("SELECT pg_try_advisory_lock(?, ?)", undef, $lockid, $timeout);
695 #warn("$$ Lock result=$lock\n");
696 if (defined $lock and $lock == 1) {
697 $self->{lock_depth
} = 1;
698 $self->{last_lock
} = $lockname;
700 die "Something went horribly wrong while getting lock $lockname";
706 # attempt to release a lock of lockname.
707 # returns 1 on success and 0 if no lock we have has that name.
709 my ($self, $lockname) = @_;
710 my $lockid = lockid
($lockname);
711 debug
("$$ Unlocking $lockname ($lockid)\n") if $Mgd::DEBUG
>= 5;
712 #my $rv = $self->dbh->selectrow_array("SELECT pg_advisory_unlock(?)", undef, $lockid);
713 my $rv = $self->dbh->do('DELETE FROM lock WHERE lockid=? AND pid=? AND hostname=?', undef, $lockid, $$, hostname
);
714 debug
("Double-release of lock $lockname!") if $self->{lock_depth
} != 0 and $rv == 0 and $Mgd::DEBUG
>= 2;
716 $self->{lock_depth
} = 0;
720 # return array of { dmid => ..., classid => ..., devcount => ..., count => ... }
721 sub get_stats_files_per_devcount
{
723 my $dbh = $self->dbh;
725 my $sth = $dbh->prepare('SELECT dmid, classid, t1.devcount, COUNT(t1.devcount) AS "count" '.
727 'SELECT fid, COUNT(devid) AS devcount FROM file_on GROUP BY fid'.
731 while (my $row = $sth->fetchrow_hashref) {
743 MogileFS::Store::Postgres - PostgreSQL data storage for MogileFS