1 # private base class for poolable HTTP/Mogstored sidechannel connections
2 # This is currently only used by HTTP, but is intended for Mogstored
4 package MogileFS
::Connection
::Poolable
;
8 use base
qw(Danga::Socket);
10 'mfs_pool', # owner of the connection (MogileFS::ConnectionPool)
11 'mfs_hostport', # [ ip, port ]
12 'mfs_expire', # Danga::Socket::Timer object
13 'mfs_expire_cb', # Danga::Socket::Timer callback
14 'mfs_requests', # number of requests made on this object
15 'mfs_err', # used to propagate an error to start()
16 'mfs_writeq', # arrayref if connecting, undef otherwise
18 use Socket
qw(SO_KEEPALIVE);
21 # subclasses (MogileFS::Connection::{HTTP,Mogstored}) must call this sub
23 my ($self, $sock, $ip, $port) = @_;
24 $self->SUPER::new
($sock); # Danga::Socket->new
26 # connection may not be established, yet
27 # so Danga::Socket->peer_addr_string can't be used here
28 $self->{mfs_hostport
} = [ $ip, $port ];
29 $self->{mfs_requests
} = 0;
31 # newly-created socket, we buffer writes until event_write is triggered
32 $self->{mfs_writeq
} = [];
37 # used by ConnectionPool for tracking per-hostport connection counts
38 sub key
{ join(':', @
{$_[0]->{mfs_hostport
}}); }
40 # backwards compatibility
41 sub host_port
{ $_[0]->key; }
43 sub ip_port
{ @
{$_[0]->{mfs_hostport
}}; }
45 sub fd
{ fileno($_[0]->sock); }
47 # marks a connection as idle, call this before putting it in a connection
48 # pool for eventual reuse.
54 # set the keepalive flag the first time we're idle
55 $self->sock->sockopt(SO_KEEPALIVE
, 1) if $self->{mfs_requests
} == 0;
57 $self->{mfs_requests
}++;
61 my ($self, $arg) = @_;
62 my $writeq = $self->{mfs_writeq
};
64 if (ref($writeq) eq "ARRAY") {
65 # if we're still connecting, we must buffer explicitly for *BSD
66 # and not attempt a real write() until event_write is triggered
68 $self->watch_write(1); # enable event_write triggering
69 0; # match Danga::Socket::write return value
71 $self->SUPER::write($arg);
75 # Danga::Socket will trigger this when a socket is writable
79 # we may have buffered writes in mfs_writeq during non-blocking connect(),
80 # this is needed on *BSD but unnecessary (but harmless) on Linux.
81 my $writeq = delete $self->{mfs_writeq
};
83 $self->watch_write(0); # ->write will re-enable if needed
84 foreach my $queued (@
$writeq) {
85 $self->write($queued);
88 $self->SUPER::event_write
();
92 # the request running on this connection is retryable if this socket
93 # has ever been marked idle. The connection pool can never be 100%
94 # reliable for detecting dead sockets, and all HTTP requests made by
95 # MogileFS are idempotent.
97 my ($self, $reason) = @_;
98 return ($reason !~ /timeout/ && $self->{mfs_requests
} > 0);
101 # Sets (or updates) the timeout of the connection
102 # timeout_key is "node_timeout" or "conn_timeout"
103 # clears the current timeout if timeout_key is undef
105 my ($self, $timeout_key) = @_;
106 my $mfs_pool = $self->{mfs_pool
};
108 $self->SetPostLoopCallback(undef);
112 if ($timeout_key =~ /\A[a-z_]+\z/) {
113 $timeout = MogileFS
->config($timeout_key) || 2;
115 $timeout = $timeout_key;
116 $timeout_key = "timeout";
119 my $t0 = Time
::HiRes
::time();
120 $self->{mfs_expire
} = $t0 + $timeout;
121 $self->{mfs_expire_cb
} = sub {
123 my $elapsed = $now - $t0;
125 # for HTTP, this will fake an HTTP error response like LWP does
126 $self->err("$timeout_key: $timeout (elapsed: $elapsed)");
128 $mfs_pool->register_timeout($self, $timeout) if $mfs_pool;
130 $self->{mfs_expire
} = $self->{mfs_expire_cb
} = undef;
131 $mfs_pool->register_timeout($self, undef) if $mfs_pool;
135 # returns the expiry time of the connection
136 sub expiry
{ $_[0]->{mfs_expire
} }
138 # runs expiry callback and returns true if time is up,
139 # returns false if there is time remaining
141 my ($self, $now) = @_;
142 my $expire = $self->{mfs_expire
} or return 0;
143 $now ||= Time
::HiRes
::time();
145 if ($now >= $expire) {
146 my $expire_cb = delete $self->{mfs_expire_cb
};
147 if ($expire_cb && $self->sock) {
148 $self->SetPostLoopCallback(sub { $expire_cb->($now); 1 });
155 # may be overriden in subclass, called only on errors
156 # The HTTP version of this will fake an HTTP response for LWP compatibility
158 my ($self, $close_reason) = @_;
160 $self->inflight_expire; # ensure we don't call new_err on eventual close()
162 if ($close_reason =~ /\A:event_(?:hup|err)\z/) {
163 # there's a chance this can be invoked while inflight,
164 # conn_drop will handle this case appropriately
165 $self->{mfs_pool
}->conn_drop($self, $close_reason) if $self->{mfs_pool
};
167 $self->close($close_reason);
171 # sets the pool this connection belongs to, only call from ConnectionPool
173 my ($self, $pool) = @_;
175 $self->{mfs_pool
} = $pool;
178 # closes a connection, and may reschedule the inflight callback if
179 # close_reason is ":retry"
181 my ($self, $close_reason) = @_;
183 delete $self->{mfs_expire_cb
}; # avoid circular ref
185 my $mfs_pool = delete $self->{mfs_pool
}; # avoid circular ref
189 $mfs_pool->schedule_queued;
190 $inflight_cb = $mfs_pool->conn_close_prepare($self, $close_reason);
192 $self->SUPER::close($close_reason); # Danga::Socket->close
194 if ($inflight_cb && $close_reason) {
195 if ($close_reason eq ":retry") {
196 my ($ip, $port) = $self->ip_port;
198 $mfs_pool->enqueue($ip, $port, $inflight_cb);
200 # Danga::Socket-scheduled write()s which fail with ECONNREFUSED,
201 # EPIPE, or "write_error" after an initial (non-blocking)
203 $mfs_pool->on_next_tick(sub {
204 ref($self)->new_err($close_reason || "error", $inflight_cb);
210 # Marks a connection as no-longer inflight. Calling this prevents retries.
211 sub inflight_expire
{
213 my $mfs_pool = $self->{mfs_pool
};
214 die "BUG: expiring without MogileFS::ConnectionPool\n" unless $mfs_pool;
215 $mfs_pool->inflight_cb_expire($self);
218 # Danga::Socket callbacks
219 sub event_hup
{ $_[0]->err(':event_hup'); }
220 sub event_err
{ $_[0]->err(':event_err'); }
222 # called when we couldn't create a socket, but need to create an object
223 # anyways for errors (creating fake, LWP-style error responses)
225 my ($class, $err, $start_cb) = @_;
226 my $self = fields
::new
($class);
227 $self->{mfs_err
} = $err;
232 # returns this connection back to its associated pool.
233 # Returns false if not successful (pool is full)
236 my $mfs_pool = $self->{mfs_pool
};
238 return $mfs_pool ?
$mfs_pool->conn_persist($self) : 0;