checksums: use a low-priority task queue for fsck digests
[MogileFS-Server.git] / lib / MogileFS / HTTPFile.pm
blobfa3d7a542b47c4761cb79c3100bbc0cb9eafc4e1
1 package MogileFS::HTTPFile;
2 use strict;
3 use warnings;
4 use Carp qw(croak);
5 use Socket qw(PF_INET IPPROTO_TCP SOCK_STREAM);
6 use Digest;
7 use MogileFS::Server;
8 use MogileFS::Util qw(error undeferr wait_for_readability wait_for_writeability);
10 # (caching the connection used for HEAD requests)
11 my $user_agent;
13 my %size_check_retry_after; # host => $hirestime.
14 my %size_check_failcount; # host => $count.
16 # create a new MogileFS::HTTPFile instance from a URL. not called
17 # "new" because I don't want to imply that it's creating anything.
18 sub at {
19 my ($class, $url) = @_;
20 my $self = bless {}, $class;
22 unless ($url =~ m!^http://([^:/]+)(?::(\d+))?(/.+)$!) {
23 croak "Bogus URL.\n";
26 $self->{url} = $url;
27 $self->{host} = $1;
28 $self->{port} = $2;
29 $self->{uri} = $3;
30 return $self;
33 sub device_id {
34 my $self = shift;
35 return $self->{devid} if $self->{devid};
36 $self->{url} =~ /\bdev(\d+)\b/
37 or die "Can't find device from URL: $self->{url}\n";
38 return $self->{devid} = $1;
41 sub host_id {
42 my $self = shift;
43 return $self->device->hostid;
46 # return MogileFS::Device object
47 sub device {
48 my $self = shift;
49 return Mgd::device_factory()->get_by_id($self->device_id);
52 # return MogileFS::Host object
53 sub host {
54 my $self = shift;
55 return $self->device->host;
58 # returns true on success, dies on failure
59 sub delete {
60 my $self = shift;
61 my %opts = @_;
62 my ($host, $port) = ($self->{host}, $self->{port});
64 my $httpsock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $port, Timeout => 2)
65 or die "can't connect to $host:$port in 2 seconds";
67 $httpsock->write("DELETE $self->{uri} HTTP/1.0\r\nConnection: keep-alive\r\n\r\n");
69 my $keep_alive = 0;
70 my $did_del = 0;
72 while (defined (my $line = <$httpsock>)) {
73 $line =~ s/[\s\r\n]+$//;
74 last unless length $line;
75 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
76 my $rescode = $1;
77 # make sure we get a good response
78 if ($rescode == 404 && $opts{ignore_missing}) {
79 $did_del = 1;
80 next;
82 unless ($rescode == 204) {
83 die "Bad response from $host:$port: [$line]";
85 $did_del = 1;
86 next;
88 die "Unexpected HTTP response line during DELETE from $host:$port: [$line]" unless $did_del;
90 die "Didn't get valid HTTP response during DELETE from $host:port" unless $did_del;
92 return 1;
95 # returns size of file, (doing a HEAD request and looking at content-length)
96 # returns -1 on file missing (404),
97 # returns undef on connectivity error
98 use constant FILE_MISSING => -1;
99 sub size {
100 my $self = shift;
101 my ($host, $port, $uri, $path) = map { $self->{$_} } qw(host port uri url);
103 return undef if (exists $size_check_retry_after{$host}
104 && $size_check_retry_after{$host} > Time::HiRes::time());
106 # don't SIGPIPE us
107 my $flag_nosignal = MogileFS::Sys->flag_nosignal;
108 local $SIG{'PIPE'} = "IGNORE" unless $flag_nosignal;
110 my $node_timeout = MogileFS->config("node_timeout");
111 # Hardcoded connection cache size of 20 :(
112 $user_agent ||= LWP::UserAgent->new(timeout => $node_timeout, keep_alive => 20);
113 my $res = $user_agent->head($path);
114 if ($res->is_success) {
115 delete $size_check_failcount{$host} if exists $size_check_failcount{$host};
116 my $size = $res->header('content-length');
117 if (! defined $size &&
118 $res->header('server') =~ m/^lighttpd/) {
119 # lighttpd 1.4.x (main release) does not return content-length for
120 # 0 byte files.
121 return 0;
123 return $size;
124 } else {
125 if ($res->code == 404) {
126 delete $size_check_failcount{$host} if exists $size_check_failcount{$host};
127 return FILE_MISSING;
129 if ($res->message =~ m/connect:/) {
130 my $count = $size_check_failcount{$host};
131 $count ||= 1;
132 $count *= 2 unless $count > 360;
133 $size_check_retry_after{$host} = Time::HiRes::time() + $count;
134 $size_check_failcount{$host} = $count;
136 return undeferr("Failed HEAD check for $path (" . $res->code . "): "
137 . $res->message);
141 sub digest_mgmt {
142 my ($self, $alg, $ping_cb, $reason) = @_;
143 my $mogconn = $self->host->mogstored_conn;
144 my $node_timeout = MogileFS->config("node_timeout");
145 my $sock;
146 my $rv;
147 my $expiry;
149 $reason = defined($reason) ? " $reason" : "";
150 my $uri = $self->{uri};
151 my $req = "$alg $uri$reason\r\n";
152 my $reqlen = length $req;
154 # a dead/stale socket may not be detected until we try to recv on it
155 # after sending a request
156 my $retries = 2;
158 # assuming the storage node can checksum at >=2MB/s, low expectations here
159 my $response_timeout = $self->size / (2 * 1024 * 1024);
161 my $flag_nosignal = MogileFS::Sys->flag_nosignal;
162 local $SIG{'PIPE'} = "IGNORE" unless $flag_nosignal;
164 retry:
165 $sock = $mogconn->sock($node_timeout) or return;
166 $rv = send($sock, $req, $flag_nosignal);
167 if ($! || $rv != $reqlen) {
168 my $err = $!;
169 $mogconn->mark_dead;
170 if ($retries-- <= 0) {
171 $req =~ tr/\r\n//d;
172 $err = $err ? "send() error ($req): $err" :
173 "short send() ($req): $rv != $reqlen";
174 $err = $mogconn->{ip} . ":" . $mogconn->{port} . " $err";
175 return undeferr($err);
177 goto retry;
180 $expiry = Time::HiRes::time() + $response_timeout;
181 while (!wait_for_readability(fileno($sock), 1.0) &&
182 (Time::HiRes::time() < $expiry)) {
183 $ping_cb->();
186 $rv = <$sock>;
187 if (! $rv) {
188 $mogconn->mark_dead;
189 return undeferr("EOF from mogstored") if ($retries-- <= 0);
190 goto retry;
191 } elsif ($rv =~ /^\Q$uri\E \Q$alg\E=([a-f0-9]{32,128})\r\n/) {
192 my $hexdigest = $1;
194 if ($hexdigest eq FILE_MISSING) {
195 # FIXME, this could be another error like EMFILE/ENFILE
196 return FILE_MISSING;
198 my $checksum = eval {
199 MogileFS::Checksum->from_string(0, "$alg:$hexdigest")
201 return undeferr("$alg failed for $uri: $@") if $@;
202 return $checksum->{checksum};
203 } elsif ($rv =~ /^ERROR /) {
204 return; # old server, fallback to HTTP
206 return undeferr("mogstored failed to handle ($alg $uri)");
209 sub digest_http {
210 my ($self, $alg, $ping_cb) = @_;
212 # don't SIGPIPE us (why don't we just globally ignore SIGPIPE?)
213 my $flag_nosignal = MogileFS::Sys->flag_nosignal;
214 local $SIG{'PIPE'} = "IGNORE" unless $flag_nosignal;
216 # TODO: refactor
217 my $node_timeout = MogileFS->config("node_timeout");
218 # Hardcoded connection cache size of 20 :(
219 $user_agent ||= LWP::UserAgent->new(timeout => $node_timeout, keep_alive => 20);
220 my $digest = Digest->new($alg);
222 my %opts = (
223 # default (4K) is tiny, use 1M like replicate
224 ':read_size_hint' => 0x100000,
225 ':content_cb' => sub {
226 $digest->add($_[0]);
227 $ping_cb->();
231 my $path = $self->{url};
232 my $res = $user_agent->get($path, %opts);
234 return $digest->digest if $res->is_success;
235 return FILE_MISSING if $res->code == 404;
236 return undeferr("Failed $alg (GET) check for $path (" . $res->code . "): "
237 . $res->message);
240 sub digest {
241 my ($self, $alg, $ping_cb, $reason) = @_;
242 my $digest = $self->digest_mgmt($alg, $ping_cb, $reason);
244 return $digest if ($digest && $digest ne FILE_MISSING);
246 $self->digest_http($alg, $ping_cb);