1 # -*- encoding: binary -*-
3 # FIXME: lots of duplication from xepolll_thread_spawn/client
5 module Rainbows::XEpollThreadPool::Client
6 Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
8 ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
9 extend Rainbows::WorkerYield
11 def self.included(klass) # included in Rainbows::Client
12 max = Rainbows.server.worker_connections
13 ACCEPTORS.map! do |sock|
17 if io = sock.kgio_accept(klass)
21 worker_yield while N[0] >= max
23 Rainbows::Error.listen_loop(e)
24 end while Rainbows.alive
29 def self.app_run(queue)
30 while client = queue.pop
36 Rainbows::O[:pool_size].times { Thread.new { app_run(QUEUE) } }
38 ep = SleepyPenguin::Epoll
40 IN = ep::IN | ep::ONESHOT
42 KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
46 LOCK.synchronize { clients = KATO.keys; KATO.clear }
47 clients.each { |io| io.closed? or io.close }
49 @@last_expire = Rainbows.now
52 LOCK.synchronize { KATO[self] = @@last_expire }
57 LOCK.synchronize { KATO.delete self }
63 EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
67 Rainbows::Error.listen_loop(e)
68 end while Rainbows.tick || N[0] > 0
69 Rainbows::JoinThreads.acceptors(ACCEPTORS)
73 return if ((now = Rainbows.now) - @@last_expire) < 1.0
74 if (ot = KEEPALIVE_TIMEOUT) >= 0
78 KATO.delete_if { |client, time| time < ot and defer << client }
80 defer.each { |io| io.closed? or io.shutdown }
86 @hp = Rainbows::HttpParser.new
109 case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
114 @hp.add_parse(buf) and return queue!
123 process_pipeline(@hp.env, @hp)
126 def pipeline_ready(hp)
127 # be fair to other clients, let others run first
128 hp.parse and return queue!