Ruby 1.9.3+-only cleanups
[rainbows.git] / lib / rainbows / epoll / client.rb
blob85e504cb2224bef35f14335992843e09e65784eb
1 # -*- encoding: binary -*-
2 # :enddoc:
4 module Rainbows::Epoll::Client
6   include Rainbows::EvCore
7   APP = Rainbows.server.app
8   Server = Rainbows::Epoll::Server
9   IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ONESHOT
10   OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ONESHOT
11   EPINOUT = IN | OUT
12   KATO = {}.compare_by_identity
13   Rainbows.at_quit { KATO.each_key(&:timeout!).clear }
14   Rainbows.config!(self, :keepalive_timeout)
15   EP = Rainbows::EP
16   @@last_expire = Rainbows.now
18   def self.expire
19     return if ((now = Rainbows.now) - @@last_expire) < 1.0
20     if (ot = KEEPALIVE_TIMEOUT) >= 0
21       ot = now - ot
22       KATO.delete_if { |client, time| time < ot and client.timeout! }
23     end
24     @@last_expire = now
25   end
27   def self.loop
28     begin
29       EP.wait(nil, 1000) { |_, obj| obj.epoll_run }
30       expire
31     rescue Errno::EINTR
32     rescue => e
33       Rainbows::Error.listen_loop(e)
34     end while Rainbows.tick || Server.nr > 0
35   end
37   # only call this once
38   def epoll_once
39     @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
40     post_init
41     on_readable
42     rescue => e
43       handle_error(e)
44   end
46   def on_readable
47     case rv = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
48     when String
49       on_read(rv)
50       return if @wr_queue[0] || closed?
51       return hijacked if @hp.hijacked?
52     when :wait_readable
53       KATO[self] = @@last_expire if :headers == @state
54       return EP.set(self, IN)
55     else
56       break
57     end until :close == @state
58     close unless closed?
59     rescue Errno::ECONNRESET
60       close
61     rescue IOError
62   end
64   def app_call input # called by on_read()
65     @env['rack.input'] = input
66     @env['REMOTE_ADDR'] = kgio_addr
67     @hp.hijack_setup(self)
68     status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
69     return hijacked if @hp.hijacked?
70     ev_write_response(status, headers, body, @hp.next?)
71   end
73   def write_response_path(status, headers, body, alive)
74     io = body_to_io(body)
75     st = io.stat
77     if st.file?
78       defer_file(status, headers, body, alive, io, st)
79     elsif st.socket? || st.pipe?
80       chunk = stream_response_headers(status, headers, alive, body)
81       return hijacked if nil == chunk
82       stream_response_body(body, io, chunk)
83     else
84       # char or block device... WTF?
85       write_response(status, headers, body, alive)
86     end
87   end
89   # used for streaming sockets and pipes
90   def stream_response_body(body, io, chunk)
91     pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
92                     Rainbows::Epoll::ResponsePipe).new(io, self, body)
93     return @wr_queue << pipe if @wr_queue[0]
94     stream_pipe(pipe) or return
95     @wr_queue[0] or @wr_queue << ''.freeze
96   end
98   def ev_write_response(status, headers, body, alive)
99     @state = alive ? :headers : :close
100     if body.respond_to?(:to_path)
101       write_response_path(status, headers, body, alive)
102     else
103       write_response(status, headers, body, alive)
104     end
105     return hijacked if @hp.hijacked?
106     # try to read more if we didn't have to buffer writes
107     next_request if alive && 0 == @wr_queue.size
108   end
110   def hijacked
111     KATO.delete(self)
112     Server.decr # no other place to do this
113     EP.delete(self)
114     nil
115   end
117   def next_request
118     if 0 == @buf.size
119       want_more
120     else
121       # pipelined request (already in buffer)
122       on_read(''.freeze)
123       return if @wr_queue[0] || closed?
124       return hijacked if @hp.hijacked?
125       close if :close == @state
126     end
127   end
129   def epoll_run
130     if @wr_queue[0]
131       on_writable
132     else
133       KATO.delete self
134       on_readable
135     end
136   end
138   def want_more
139     EP.set(self, EPINOUT)
140   end
142   def on_deferred_write_complete
143     :close == @state and return close
144     next_request
145   end
147   def handle_error(e)
148     msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
149     ensure
150       close
151   end
153   def write_deferred(obj)
154     Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
155   end
157   # writes until our write buffer is empty or we block
158   # returns true if we're done writing everything
159   def on_writable
160     obj = @wr_queue.shift
162     case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
163     when nil
164       obj = @wr_queue.shift or return on_deferred_write_complete
165     when String
166       obj = rv # retry
167     when :wait_writable # Strings and StreamFiles only
168       @wr_queue.unshift(obj)
169       EP.set(self, OUT)
170       return
171     when :deferred
172       return
173     end while true
174     rescue => e
175       handle_error(e)
176   end
178   def write(buf)
179     unless @wr_queue[0]
180       case rv = kgio_trywrite(buf)
181       when nil
182         return # all written
183       when String
184         buf = rv # retry
185       when :wait_writable
186         @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
187         return EP.set(self, OUT)
188       end while true
189     end
190     @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
191   end
193   def close
194     @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
195     super
196     on_close
197   end
199   def on_close
200     KATO.delete(self)
201     Server.decr
202   end
204   def timeout!
205     shutdown
206     true
207   end
209   # Rack apps should not hijack here, but they may...
210   def defer_file(status, headers, body, alive, io, st)
211     if r = sendfile_range(status, headers)
212       status, headers, range = r
213       write_headers(status, headers, alive, body) or return hijacked
214       range and defer_file_stream(range[0], range[1], io, body)
215     else
216       write_headers(status, headers, alive, body) or return hijacked
217       defer_file_stream(0, st.size, io, body)
218     end
219   end
221   # returns +nil+ on EOF, :wait_writable if the client blocks
222   def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
223     case n = trysendfile(sf, sf.offset, sf.count)
224     when Integer
225       sf.offset += n
226       0 == (sf.count -= n) and return sf.close
227     else
228       return n # :wait_writable or nil
229     end while true
230     rescue
231       sf.close
232       raise
233   end
235   def defer_file_stream(offset, count, io, body)
236     sf = Rainbows::StreamFile.new(offset, count, io, body)
237     unless @wr_queue[0]
238       stream_file(sf) or return
239     end
240     @wr_queue << sf
241     EP.set(self, OUT)
242   end
244   # this alternates between a push and pull model from the pipe -> client
245   # to avoid having too much data in userspace on either end.
246   def stream_pipe(pipe)
247     case buf = pipe.tryread
248     when String
249       write(buf)
250       if @wr_queue[0]
251         # client is blocked on write, client will pull from pipe later
252         EP.delete pipe
253         @wr_queue << pipe
254         EP.set(self, OUT)
255         return :deferred
256       end
257       # continue looping...
258     when :wait_readable
259       # pipe blocked on read, let the pipe push to the client in the future
260       EP.delete self
261       EP.set(pipe, IN)
262       return :deferred
263     else # nil => EOF
264       return pipe.close # nil
265     end while true
266     rescue
267       pipe.close
268       raise
269   end