httpfile: correct FILE_MISSING check in digest_mgmt
[MogileFS-Server.git] / lib / MogileFS / HTTPFile.pm
blob4f1370f91b0979559080ede74e4edd6be6e09e0a
1 package MogileFS::HTTPFile;
2 use strict;
3 use warnings;
4 use Carp qw(croak);
5 use Socket qw(PF_INET IPPROTO_TCP SOCK_STREAM);
6 use Digest;
7 use MogileFS::Server;
8 use MogileFS::Util qw(error undeferr wait_for_readability wait_for_writeability);
10 # (caching the connection used for HEAD requests)
11 my $user_agent;
13 my %size_check_retry_after; # host => $hirestime.
14 my %size_check_failcount; # host => $count.
16 my %sidechannel_nexterr; # host => next error log time
18 # create a new MogileFS::HTTPFile instance from a URL. not called
19 # "new" because I don't want to imply that it's creating anything.
20 sub at {
21 my ($class, $url) = @_;
22 my $self = bless {}, $class;
24 unless ($url =~ m!^http://([^:/]+)(?::(\d+))?(/.+)$!) {
25 croak "Bogus URL.\n";
28 $self->{url} = $url;
29 $self->{host} = $1;
30 $self->{port} = $2;
31 $self->{uri} = $3;
32 return $self;
35 sub device_id {
36 my $self = shift;
37 return $self->{devid} if $self->{devid};
38 $self->{url} =~ /\bdev(\d+)\b/
39 or die "Can't find device from URL: $self->{url}\n";
40 return $self->{devid} = $1;
43 sub host_id {
44 my $self = shift;
45 return $self->device->hostid;
48 # return MogileFS::Device object
49 sub device {
50 my $self = shift;
51 return Mgd::device_factory()->get_by_id($self->device_id);
54 # return MogileFS::Host object
55 sub host {
56 my $self = shift;
57 return $self->device->host;
60 # returns true on success, dies on failure
61 sub delete {
62 my $self = shift;
63 my %opts = @_;
64 my ($host, $port) = ($self->{host}, $self->{port});
66 my $httpsock = IO::Socket::INET->new(PeerAddr => $host, PeerPort => $port, Timeout => 2)
67 or die "can't connect to $host:$port in 2 seconds";
69 $httpsock->write("DELETE $self->{uri} HTTP/1.0\r\nConnection: keep-alive\r\n\r\n");
71 my $keep_alive = 0;
72 my $did_del = 0;
74 while (defined (my $line = <$httpsock>)) {
75 $line =~ s/[\s\r\n]+$//;
76 last unless length $line;
77 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
78 my $rescode = $1;
79 # make sure we get a good response
80 if ($rescode == 404 && $opts{ignore_missing}) {
81 $did_del = 1;
82 next;
84 unless ($rescode == 204) {
85 die "Bad response from $host:$port: [$line]";
87 $did_del = 1;
88 next;
90 die "Unexpected HTTP response line during DELETE from $host:$port: [$line]" unless $did_del;
92 die "Didn't get valid HTTP response during DELETE from $host:port" unless $did_del;
94 return 1;
97 # returns size of file, (doing a HEAD request and looking at content-length)
98 # returns -1 on file missing (404),
99 # returns undef on connectivity error
100 use constant FILE_MISSING => -1;
101 sub size {
102 my $self = shift;
104 return $self->{_size} if defined $self->{_size};
106 my ($host, $port, $uri, $path) = map { $self->{$_} } qw(host port uri url);
108 return undef if (exists $size_check_retry_after{$host}
109 && $size_check_retry_after{$host} > Time::HiRes::time());
111 my $node_timeout = MogileFS->config("node_timeout");
112 # Hardcoded connection cache size of 20 :(
113 $user_agent ||= LWP::UserAgent->new(timeout => $node_timeout, keep_alive => 20);
114 my $res = $user_agent->head($path);
115 if ($res->is_success) {
116 delete $size_check_failcount{$host} if exists $size_check_failcount{$host};
117 my $size = $res->header('content-length');
118 if (! defined $size &&
119 $res->header('server') =~ m/^lighttpd/) {
120 # lighttpd 1.4.x (main release) does not return content-length for
121 # 0 byte files.
122 $self->{_size} = 0;
123 return 0;
125 $self->{_size} = $size;
126 return $size;
127 } else {
128 if ($res->code == 404) {
129 delete $size_check_failcount{$host} if exists $size_check_failcount{$host};
130 return FILE_MISSING;
132 if ($res->message =~ m/connect:/) {
133 my $count = $size_check_failcount{$host};
134 $count ||= 1;
135 $count *= 2 unless $count > 360;
136 $size_check_retry_after{$host} = Time::HiRes::time() + $count;
137 $size_check_failcount{$host} = $count;
139 return undeferr("Failed HEAD check for $path (" . $res->code . "): "
140 . $res->message);
144 sub digest_mgmt {
145 my ($self, $alg, $ping_cb, $reason) = @_;
146 my $mogconn = $self->host->mogstored_conn;
147 my $node_timeout = MogileFS->config("node_timeout");
148 my $sock;
149 my $rv;
150 my $expiry;
152 $reason = defined($reason) ? " $reason" : "";
153 my $uri = $self->{uri};
154 my $req = "$alg $uri$reason\r\n";
155 my $reqlen = length $req;
157 # a dead/stale socket may not be detected until we try to recv on it
158 # after sending a request
159 my $retries = 2;
161 # assuming the storage node can checksum at >=2MB/s, low expectations here
162 my $response_timeout = $self->size / (2 * 1024 * 1024);
163 my $host = $self->{host};
165 retry:
166 $sock = eval { $mogconn->sock($node_timeout) };
167 if (defined $sock) {
168 delete $sidechannel_nexterr{$host};
169 } else {
170 # avoid flooding logs with identical messages
171 my $err = $@;
172 my $next = $sidechannel_nexterr{$host} || 0;
173 my $now = time();
174 return if $now < $next;
175 $sidechannel_nexterr{$host} = $now + 300;
176 return undeferr("sidechannel failure on $alg $uri: $err");
179 $rv = send($sock, $req, 0);
180 if ($! || $rv != $reqlen) {
181 my $err = $!;
182 $mogconn->mark_dead;
183 if ($retries-- <= 0) {
184 $req =~ tr/\r\n//d;
185 $err = $err ? "send() error ($req): $err" :
186 "short send() ($req): $rv != $reqlen";
187 $err = $mogconn->{ip} . ":" . $mogconn->{port} . " $err";
188 return undeferr($err);
190 goto retry;
193 $expiry = Time::HiRes::time() + $response_timeout;
194 while (!wait_for_readability(fileno($sock), 1.0) &&
195 (Time::HiRes::time() < $expiry)) {
196 $ping_cb->();
199 $rv = <$sock>;
200 if (! $rv) {
201 $mogconn->mark_dead;
202 return undeferr("EOF from mogstored") if ($retries-- <= 0);
203 goto retry;
204 } elsif ($rv =~ /^\Q$uri\E \Q$alg\E=([a-f0-9]{32,128})\r\n/) {
205 my $hexdigest = $1;
207 my $checksum = eval {
208 MogileFS::Checksum->from_string(0, "$alg:$hexdigest")
210 return undeferr("$alg failed for $uri: $@") if $@;
211 return $checksum->{checksum};
212 } elsif ($rv =~ /^\Q$uri\E \Q$alg\E=-1\r\n/) {
213 # FIXME, this could be another error like EMFILE/ENFILE
214 return FILE_MISSING;
215 } elsif ($rv =~ /^ERROR /) {
216 return; # old server, fallback to HTTP
218 return undeferr("mogstored failed to handle ($alg $uri)");
221 sub digest_http {
222 my ($self, $alg, $ping_cb) = @_;
224 # TODO: refactor
225 my $node_timeout = MogileFS->config("node_timeout");
226 # Hardcoded connection cache size of 20 :(
227 $user_agent ||= LWP::UserAgent->new(timeout => $node_timeout, keep_alive => 20);
228 my $digest = Digest->new($alg);
230 my %opts = (
231 # default (4K) is tiny, use 1M like replicate
232 ':read_size_hint' => 0x100000,
233 ':content_cb' => sub {
234 $digest->add($_[0]);
235 $ping_cb->();
239 my $path = $self->{url};
240 my $res = $user_agent->get($path, %opts);
242 return $digest->digest if $res->is_success;
243 return FILE_MISSING if $res->code == 404;
244 return undeferr("Failed $alg (GET) check for $path (" . $res->code . "): "
245 . $res->message);
248 sub digest {
249 my ($self, $alg, $ping_cb, $reason) = @_;
250 my $digest = $self->digest_mgmt($alg, $ping_cb, $reason);
252 return $digest if ($digest && $digest ne FILE_MISSING);
254 $self->digest_http($alg, $ping_cb);