Checking in changes prior to tagging of version 2.73.
[MogileFS-Server.git] / lib / Mogstored / SideChannelClient.pm
blobe68421bfedbd4b738a86ac5911ec7d01aaa9137e
1 ### simple package for handling the stream request port
2 package Mogstored::SideChannelClient;
4 use strict;
5 use base qw{Perlbal::Socket};
6 use fields (
7 'count', # how many requests we've serviced
8 'read_buf', # unprocessed read buffer
9 'mogsvc', # the mogstored Perlbal::Service object
11 use Digest;
12 use POSIX qw(O_RDONLY);
13 use Mogstored::TaskQueue;
15 BEGIN {
16 eval { require IO::AIO; };
19 # TODO: interface to make this tunable
20 my %digest_queues;
22 # needed since we're pretending to be a Perlbal::Socket... never idle out
23 sub max_idle_time { return 0; }
25 sub new {
26 my Mogstored::SideChannelClient $self = shift;
27 $self = fields::new($self) unless ref $self;
28 $self->SUPER::new(@_);
29 $self->{count} = 0;
30 $self->{read_buf} = '';
31 $self->{mogsvc} = Perlbal->service('mogstored');
32 return $self;
35 sub validate_uri {
36 my ($self, $uri) = @_;
37 if ($uri =~ /\.\./) {
38 $self->write("ERROR: uri invalid (contains ..)\r\n");
39 return;
41 $uri;
44 sub event_read {
45 my Mogstored::SideChannelClient $self = shift;
47 my $bref = $self->read(1024);
48 return $self->close unless defined $bref;
49 $self->{read_buf} .= $$bref;
50 $self->read_buf_consume;
53 sub read_buf_consume {
54 my $self = shift;
55 my $path = $self->{mogsvc}->{docroot};
57 while ($self->{read_buf} =~ s/^(.+?)\r?\n//) {
58 my $cmd = $1;
59 if ($cmd =~ /^size (\S+)$/) {
60 # increase our count
61 $self->{count}++;
63 my $uri = $self->validate_uri($1);
64 return unless defined($uri);
66 # now stat the file to get the size and such
67 Perlbal::AIO::aio_stat("$path$uri", sub {
68 return if $self->{closed};
69 my $size = -e _ ? -s _ : -1;
70 $self->write("$uri $size\r\n");
71 });
72 } elsif ($cmd =~ /^watch$/i) {
73 unless (Mogstored->iostat_available) {
74 $self->write("ERR iostat unavailable\r\n");
75 next;
77 $self->watch_read(0);
78 Mogstored->iostat_subscribe($self);
79 } elsif ($cmd =~ /^(MD5|SHA-1) (\S+)(?: (\w+))?$/) {
80 # we can easily enable other hash algorithms with the above
81 # regexp, but we won't for now (see MogileFS::Checksum)
82 my $alg = $1;
83 my $uri = $self->validate_uri($2);
84 my $reason = $3;
85 return unless defined($uri);
87 return $self->digest($alg, $path, $uri, $reason);
88 } else {
89 # we don't understand this so pass it on to manage command interface
90 my @out;
91 Perlbal::run_manage_command($cmd, sub { push @out, $_[0]; });
92 $self->write(join("\r\n", @out) . "\r\n");
97 # stop watching writeability if we've nothing else to
98 # write to them. else just kick off more writes.
99 sub event_write {
100 my $self = shift;
101 $self->watch_write(0) if $self->write(undef);
104 # override Danga::Socket's event handlers which die
105 sub event_err { $_[0]->close; }
106 sub event_hup { $_[0]->close; }
108 # as_string handler
109 sub as_string {
110 my Mogstored::SideChannelClient $self = shift;
112 my $ret = $self->SUPER::as_string;
113 $ret .= "; size_requests=$self->{count}";
115 return $ret;
118 sub close {
119 my Mogstored::SideChannelClient $self = shift;
120 Mogstored->iostat_unsubscribe($self);
121 $self->SUPER::close;
124 sub die_gracefully {
125 Mogstored->on_sidechannel_die_gracefully;
128 sub digest {
129 my ($self, $alg, $path, $uri, $reason) = @_;
131 $self->watch_read(0);
133 Perlbal::AIO::aio_open("$path$uri", O_RDONLY, 0, sub {
134 my $fh = shift;
135 eval {
136 IO::AIO::fadvise(fileno($fh), 0, 0, IO::AIO::FADV_SEQUENTIAL());
139 if ($self->{closed}) {
140 CORE::close($fh) if $fh;
141 return;
143 if ($fh) {
144 my $queue;
146 if ($reason && $reason eq "fsck") {
147 # fstat(2) should return immediately, no AIO needed
148 my $devid = (stat($fh))[0];
149 $queue = $digest_queues{$devid} ||= Mogstored::TaskQueue->new;
150 $queue->run(sub { $self->digest_fh($alg, $fh, $uri, $queue) });
151 } else {
152 $self->digest_fh($alg, $fh, $uri);
154 } else {
155 $self->write("$uri $alg=-1\r\n");
156 $self->after_long_request;
161 sub digest_fh {
162 my ($self, $alg, $fh, $uri, $queue) = @_;
163 my $offset = 0;
164 my $data = '';
165 my $digest = Digest->new($alg);
166 my $cb;
168 $cb = sub {
169 my $retval = shift;
170 if ($retval > 0) {
171 my $bytes = length($data);
172 $offset += $bytes;
173 $digest->add($data);
174 Perlbal::AIO::aio_read($fh, $offset, 0x100000, $data, $cb);
175 } elsif ($retval == 0) { # EOF
176 $cb = undef;
177 CORE::close($fh);
178 $digest = $digest->hexdigest;
179 $self->write("$uri $alg=$digest\r\n");
180 $queue->task_done if $queue;
181 $self->after_long_request;
182 } else {
183 $cb = undef;
184 CORE::close($fh);
185 $self->write("ERR read $uri at $offset failed\r\n");
186 $queue->task_done if $queue;
187 $self->after_long_request; # should we try to continue?
190 Perlbal::AIO::aio_read($fh, $offset, 0x100000, $data, $cb);
193 sub after_long_request {
194 my $self = shift;
196 if ($self->{read_buf} =~ /^(.+?)\r?\n/) {
197 $self->read_buf_consume;
198 } else {
199 $self->watch_read(1);