1 package MogileFS
::HTTPFile
;
5 use Socket
qw(PF_INET IPPROTO_TCP SOCK_STREAM);
8 use MogileFS
::Util
qw(error undeferr wait_for_readability wait_for_writeability);
10 # (caching the connection used for HEAD requests)
13 my %size_check_retry_after; # host => $hirestime.
14 my %size_check_failcount; # host => $count.
16 # create a new MogileFS::HTTPFile instance from a URL. not called
17 # "new" because I don't want to imply that it's creating anything.
19 my ($class, $url) = @_;
20 my $self = bless {}, $class;
22 unless ($url =~ m!^http://([^:/]+)(?::(\d+))?(/.+)$!) {
35 return $self->{devid
} if $self->{devid
};
36 $self->{url
} =~ /\bdev(\d+)\b/
37 or die "Can't find device from URL: $self->{url}\n";
38 return $self->{devid
} = $1;
43 return $self->device->hostid;
46 # return MogileFS::Device object
49 return Mgd
::device_factory
()->get_by_id($self->device_id);
52 # return MogileFS::Host object
55 return $self->device->host;
58 # returns true on success, dies on failure
62 my ($host, $port) = ($self->{host
}, $self->{port
});
64 my $httpsock = IO
::Socket
::INET
->new(PeerAddr
=> $host, PeerPort
=> $port, Timeout
=> 2)
65 or die "can't connect to $host:$port in 2 seconds";
67 $httpsock->write("DELETE $self->{uri} HTTP/1.0\r\nConnection: keep-alive\r\n\r\n");
72 while (defined (my $line = <$httpsock>)) {
73 $line =~ s/[\s\r\n]+$//;
74 last unless length $line;
75 if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
77 # make sure we get a good response
78 if ($rescode == 404 && $opts{ignore_missing
}) {
82 unless ($rescode == 204) {
83 die "Bad response from $host:$port: [$line]";
88 die "Unexpected HTTP response line during DELETE from $host:$port: [$line]" unless $did_del;
90 die "Didn't get valid HTTP response during DELETE from $host:port" unless $did_del;
95 # returns size of file, (doing a HEAD request and looking at content-length)
96 # returns -1 on file missing (404),
97 # returns undef on connectivity error
98 use constant FILE_MISSING
=> -1;
101 my ($host, $port, $uri, $path) = map { $self->{$_} } qw(host port uri url);
103 return undef if (exists $size_check_retry_after{$host}
104 && $size_check_retry_after{$host} > Time
::HiRes
::time());
107 my $flag_nosignal = MogileFS
::Sys
->flag_nosignal;
108 local $SIG{'PIPE'} = "IGNORE" unless $flag_nosignal;
110 my $node_timeout = MogileFS
->config("node_timeout");
111 # Hardcoded connection cache size of 20 :(
112 $user_agent ||= LWP
::UserAgent
->new(timeout
=> $node_timeout, keep_alive
=> 20);
113 my $res = $user_agent->head($path);
114 if ($res->is_success) {
115 delete $size_check_failcount{$host} if exists $size_check_failcount{$host};
116 my $size = $res->header('content-length');
117 if (! defined $size &&
118 $res->header('server') =~ m/^lighttpd/) {
119 # lighttpd 1.4.x (main release) does not return content-length for
125 if ($res->code == 404) {
126 delete $size_check_failcount{$host} if exists $size_check_failcount{$host};
129 if ($res->message =~ m/connect:/) {
130 my $count = $size_check_failcount{$host};
132 $count *= 2 unless $count > 360;
133 $size_check_retry_after{$host} = Time
::HiRes
::time() + $count;
134 $size_check_failcount{$host} = $count;
136 return undeferr
("Failed HEAD check for $path (" . $res->code . "): "
142 my ($self, $alg, $ping_cb, $reason) = @_;
143 my $mogconn = $self->host->mogstored_conn;
144 my $node_timeout = MogileFS
->config("node_timeout");
149 $reason = defined($reason) ?
" $reason" : "";
150 my $uri = $self->{uri
};
151 my $req = "$alg $uri$reason\r\n";
152 my $reqlen = length $req;
154 # a dead/stale socket may not be detected until we try to recv on it
155 # after sending a request
158 # assuming the storage node can checksum at >=2MB/s, low expectations here
159 my $response_timeout = $self->size / (2 * 1024 * 1024);
161 my $flag_nosignal = MogileFS
::Sys
->flag_nosignal;
162 local $SIG{'PIPE'} = "IGNORE" unless $flag_nosignal;
165 $sock = $mogconn->sock($node_timeout) or return;
166 $rv = send($sock, $req, $flag_nosignal);
167 if ($! || $rv != $reqlen) {
170 if ($retries-- <= 0) {
172 $err = $err ?
"send() error ($req): $err" :
173 "short send() ($req): $rv != $reqlen";
174 $err = $mogconn->{ip
} . ":" . $mogconn->{port
} . " $err";
175 return undeferr
($err);
180 $expiry = Time
::HiRes
::time() + $response_timeout;
181 while (!wait_for_readability
(fileno($sock), 1.0) &&
182 (Time
::HiRes
::time() < $expiry)) {
189 return undeferr
("EOF from mogstored") if ($retries-- <= 0);
191 } elsif ($rv =~ /^\Q$uri\E \Q$alg\E=([a-f0-9]{32,128})\r\n/) {
194 if ($hexdigest eq FILE_MISSING
) {
195 # FIXME, this could be another error like EMFILE/ENFILE
198 my $checksum = eval {
199 MogileFS
::Checksum
->from_string(0, "$alg:$hexdigest")
201 return undeferr
("$alg failed for $uri: $@") if $@
;
202 return $checksum->{checksum
};
203 } elsif ($rv =~ /^ERROR /) {
204 return; # old server, fallback to HTTP
206 return undeferr
("mogstored failed to handle ($alg $uri)");
210 my ($self, $alg, $ping_cb) = @_;
212 # don't SIGPIPE us (why don't we just globally ignore SIGPIPE?)
213 my $flag_nosignal = MogileFS
::Sys
->flag_nosignal;
214 local $SIG{'PIPE'} = "IGNORE" unless $flag_nosignal;
217 my $node_timeout = MogileFS
->config("node_timeout");
218 # Hardcoded connection cache size of 20 :(
219 $user_agent ||= LWP
::UserAgent
->new(timeout
=> $node_timeout, keep_alive
=> 20);
220 my $digest = Digest
->new($alg);
223 # default (4K) is tiny, use 1M like replicate
224 ':read_size_hint' => 0x100000,
225 ':content_cb' => sub {
231 my $path = $self->{url
};
232 my $res = $user_agent->get($path, %opts);
234 return $digest->digest if $res->is_success;
235 return FILE_MISSING
if $res->code == 404;
236 return undeferr
("Failed $alg (GET) check for $path (" . $res->code . "): "
241 my ($self, $alg, $ping_cb, $reason) = @_;
242 my $digest = $self->digest_mgmt($alg, $ping_cb, $reason);
244 return $digest if ($digest && $digest ne FILE_MISSING
);
246 $self->digest_http($alg, $ping_cb);