1 # -*- encoding: binary -*-
4 module Rainbows::Epoll::Client
6 include Rainbows::EvCore
7 APP = Rainbows.server.app
8 Server = Rainbows::Epoll::Server
9 IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ONESHOT
10 OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ONESHOT
13 KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
14 Rainbows.at_quit { KATO.each_key { |k| k.timeout! }.clear }
15 Rainbows.config!(self, :keepalive_timeout)
17 @@last_expire = Rainbows.now
20 return if ((now = Rainbows.now) - @@last_expire) < 1.0
21 if (ot = KEEPALIVE_TIMEOUT) >= 0
23 KATO.delete_if { |client, time| time < ot and client.timeout! }
30 EP.wait(nil, 1000) { |_, obj| obj.epoll_run }
34 Rainbows::Error.listen_loop(e)
35 end while Rainbows.tick || Server.nr > 0
40 @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
48 case rv = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
51 return if @wr_queue[0] || closed?
52 return hijacked if @hp.hijacked?
54 KATO[self] = @@last_expire if :headers == @state
55 return EP.set(self, IN)
58 end until :close == @state
60 rescue Errno::ECONNRESET
65 def app_call input # called by on_read()
66 @env['rack.input'] = input
67 @env['REMOTE_ADDR'] = kgio_addr
68 @hp.hijack_setup(self)
69 status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
70 return hijacked if @hp.hijacked?
71 ev_write_response(status, headers, body, @hp.next?)
74 def write_response_path(status, headers, body, alive)
79 defer_file(status, headers, body, alive, io, st)
80 elsif st.socket? || st.pipe?
81 chunk = stream_response_headers(status, headers, alive, body)
82 return hijacked if nil == chunk
83 stream_response_body(body, io, chunk)
85 # char or block device... WTF?
86 write_response(status, headers, body, alive)
90 # used for streaming sockets and pipes
91 def stream_response_body(body, io, chunk)
92 pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
93 Rainbows::Epoll::ResponsePipe).new(io, self, body)
94 return @wr_queue << pipe if @wr_queue[0]
95 stream_pipe(pipe) or return
96 @wr_queue[0] or @wr_queue << ''.freeze
99 def ev_write_response(status, headers, body, alive)
100 @state = alive ? :headers : :close
101 if body.respond_to?(:to_path)
102 write_response_path(status, headers, body, alive)
104 write_response(status, headers, body, alive)
106 return hijacked if @hp.hijacked?
107 # try to read more if we didn't have to buffer writes
108 next_request if alive && 0 == @wr_queue.size
113 Server.decr # no other place to do this
122 # pipelined request (already in buffer)
124 return if @wr_queue[0] || closed?
125 return hijacked if @hp.hijacked?
126 close if :close == @state
140 EP.set(self, EPINOUT)
143 def on_deferred_write_complete
144 :close == @state and return close
149 msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
154 def write_deferred(obj)
155 Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
158 # writes until our write buffer is empty or we block
159 # returns true if we're done writing everything
161 obj = @wr_queue.shift
163 case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
165 obj = @wr_queue.shift or return on_deferred_write_complete
168 when :wait_writable # Strings and StreamFiles only
169 @wr_queue.unshift(obj)
181 case rv = kgio_trywrite(buf)
187 @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
188 return EP.set(self, OUT)
191 @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
195 @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
210 # Rack apps should not hijack here, but they may...
211 def defer_file(status, headers, body, alive, io, st)
212 if r = sendfile_range(status, headers)
213 status, headers, range = r
214 write_headers(status, headers, alive, body) or return hijacked
215 range and defer_file_stream(range[0], range[1], io, body)
217 write_headers(status, headers, alive, body) or return hijacked
218 defer_file_stream(0, st.size, io, body)
222 # returns +nil+ on EOF, :wait_writable if the client blocks
223 def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
224 case n = trysendfile(sf, sf.offset, sf.count)
227 0 == (sf.count -= n) and return sf.close
229 return n # :wait_writable or nil
236 def defer_file_stream(offset, count, io, body)
237 sf = Rainbows::StreamFile.new(offset, count, io, body)
239 stream_file(sf) or return
245 # this alternates between a push and pull model from the pipe -> client
246 # to avoid having too much data in userspace on either end.
247 def stream_pipe(pipe)
248 case buf = pipe.tryread
252 # client is blocked on write, client will pull from pipe later
258 # continue looping...
260 # pipe blocked on read, let the pipe push to the client in the future
265 return pipe.close # nil