1 package MogileFS
::HTTPFile
;
5 use Socket
qw(PF_INET IPPROTO_TCP SOCK_STREAM);
8 use MogileFS
::Util
qw(error undeferr wait_for_readability wait_for_writeability);
10 # (caching the connection used for HEAD requests)
13 my %size_check_retry_after; # host => $hirestime.
14 my %size_check_failcount; # host => $count.
16 my %sidechannel_nexterr; # host => next error log time
18 # create a new MogileFS::HTTPFile instance from a URL. not called
19 # "new" because I don't want to imply that it's creating anything.
21 my ($class, $url) = @_;
22 my $self = bless {}, $class;
24 unless ($url =~ m!^http://([^:/]+)(?::(\d+))?(/.+)$!) {
37 return $self->{devid
} if $self->{devid
};
38 $self->{url
} =~ /\bdev(\d+)\b/
39 or die "Can't find device from URL: $self->{url}\n";
40 return $self->{devid
} = $1;
45 return $self->device->hostid;
48 # return MogileFS::Device object
51 return Mgd
::device_factory
()->get_by_id($self->device_id);
54 # return MogileFS::Host object
57 return $self->device->host;
60 # returns true on success, dies on failure
64 my ($host, $port) = ($self->{host
}, $self->{port
});
66 my $httpsock = IO
::Socket
::INET
->new(PeerAddr
=> $host, PeerPort
=> $port, Timeout
=> 2)
67 or die "can't connect to $host:$port in 2 seconds";
69 $httpsock->write("DELETE $self->{uri} HTTP/1.0\r\nConnection: keep-alive\r\n\r\n");
74 while (defined (my $line = <$httpsock>)) {
75 $line =~ s/[\s\r\n]+$//;
76 last unless length $line;
77 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
79 # make sure we get a good response
80 if ($rescode == 404 && $opts{ignore_missing
}) {
84 unless ($rescode == 204) {
85 die "Bad response from $host:$port: [$line]";
90 die "Unexpected HTTP response line during DELETE from $host:$port: [$line]" unless $did_del;
92 die "Didn't get valid HTTP response during DELETE from $host:port" unless $did_del;
97 # returns size of file, (doing a HEAD request and looking at content-length)
98 # returns -1 on file missing (404),
99 # returns undef on connectivity error
100 use constant FILE_MISSING
=> -1;
104 return $self->{_size
} if defined $self->{_size
};
106 my ($host, $port, $uri, $path) = map { $self->{$_} } qw(host port uri url);
108 return undef if (exists $size_check_retry_after{$host}
109 && $size_check_retry_after{$host} > Time
::HiRes
::time());
111 my $node_timeout = MogileFS
->config("node_timeout");
112 # Hardcoded connection cache size of 20 :(
113 $user_agent ||= LWP
::UserAgent
->new(timeout
=> $node_timeout, keep_alive
=> 20);
114 my $res = $user_agent->head($path);
115 if ($res->is_success) {
116 delete $size_check_failcount{$host} if exists $size_check_failcount{$host};
117 my $size = $res->header('content-length');
118 if (! defined $size &&
119 $res->header('server') =~ m/^lighttpd/) {
120 # lighttpd 1.4.x (main release) does not return content-length for
125 $self->{_size
} = $size;
128 if ($res->code == 404) {
129 delete $size_check_failcount{$host} if exists $size_check_failcount{$host};
132 if ($res->message =~ m/connect:/) {
133 my $count = $size_check_failcount{$host};
135 $count *= 2 unless $count > 360;
136 $size_check_retry_after{$host} = Time
::HiRes
::time() + $count;
137 $size_check_failcount{$host} = $count;
139 return undeferr
("Failed HEAD check for $path (" . $res->code . "): "
145 my ($self, $alg, $ping_cb, $reason) = @_;
146 my $mogconn = $self->host->mogstored_conn;
147 my $node_timeout = MogileFS
->config("node_timeout");
152 $reason = defined($reason) ?
" $reason" : "";
153 my $uri = $self->{uri
};
154 my $req = "$alg $uri$reason\r\n";
155 my $reqlen = length $req;
157 # a dead/stale socket may not be detected until we try to recv on it
158 # after sending a request
161 # assuming the storage node can checksum at >=2MB/s, low expectations here
162 my $response_timeout = $self->size / (2 * 1024 * 1024);
163 my $host = $self->{host
};
166 $sock = eval { $mogconn->sock($node_timeout) };
168 delete $sidechannel_nexterr{$host};
170 # avoid flooding logs with identical messages
172 my $next = $sidechannel_nexterr{$host} || 0;
174 return if $now < $next;
175 $sidechannel_nexterr{$host} = $now + 300;
176 return undeferr
("sidechannel failure on $alg $uri: $err");
179 $rv = send($sock, $req, 0);
180 if ($! || $rv != $reqlen) {
183 if ($retries-- <= 0) {
185 $err = $err ?
"send() error ($req): $err" :
186 "short send() ($req): $rv != $reqlen";
187 $err = $mogconn->{ip
} . ":" . $mogconn->{port
} . " $err";
188 return undeferr
($err);
193 $expiry = Time
::HiRes
::time() + $response_timeout;
194 while (!wait_for_readability
(fileno($sock), 1.0) &&
195 (Time
::HiRes
::time() < $expiry)) {
202 return undeferr
("EOF from mogstored") if ($retries-- <= 0);
204 } elsif ($rv =~ /^\Q$uri\E \Q$alg\E=([a-f0-9]{32,128})\r\n/) {
207 my $checksum = eval {
208 MogileFS
::Checksum
->from_string(0, "$alg:$hexdigest")
210 return undeferr
("$alg failed for $uri: $@") if $@
;
211 return $checksum->{checksum
};
212 } elsif ($rv =~ /^\Q$uri\E \Q$alg\E=-1\r\n/) {
213 # FIXME, this could be another error like EMFILE/ENFILE
215 } elsif ($rv =~ /^ERROR /) {
216 return; # old server, fallback to HTTP
218 return undeferr
("mogstored failed to handle ($alg $uri)");
222 my ($self, $alg, $ping_cb) = @_;
225 my $node_timeout = MogileFS
->config("node_timeout");
226 # Hardcoded connection cache size of 20 :(
227 $user_agent ||= LWP
::UserAgent
->new(timeout
=> $node_timeout, keep_alive
=> 20);
228 my $digest = Digest
->new($alg);
231 # default (4K) is tiny, use 1M like replicate
232 ':read_size_hint' => 0x100000,
233 ':content_cb' => sub {
239 my $path = $self->{url
};
240 my $res = $user_agent->get($path, %opts);
242 return $digest->digest if $res->is_success;
243 return FILE_MISSING
if $res->code == 404;
244 return undeferr
("Failed $alg (GET) check for $path (" . $res->code . "): "
249 my ($self, $alg, $ping_cb, $reason) = @_;
250 my $digest = $self->digest_mgmt($alg, $ping_cb, $reason);
252 return $digest if ($digest && $digest ne FILE_MISSING
);
254 $self->digest_http($alg, $ping_cb);