6 Thread.abort_on_exception = true
8 require 'sleepy_penguin'
10 class TestEpoll < Test::Unit::TestCase
12 RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
20 Epoll.constants.each do |const|
21 next if const.to_sym == :IO
22 nr = Epoll.const_get(const)
23 assert nr <= 0xffffffff, "#{const}=#{nr}"
30 Thread.new { sleep 0.100; @ep.add(@wr, Epoll::OUT) }
31 @ep.wait { |flags,obj| tmp << [ flags, obj ] }
32 elapsed = Time.now - t0
33 assert elapsed >= 0.100
34 assert_equal [[Epoll::OUT, @wr]], tmp, tmp.inspect
39 @ep.add @rd, Epoll::IN
41 @ep.wait(nil, 100) { |flags,obj| tmp << [ flags, obj ] }
45 _, status = Process.waitpid2(pid)
46 assert status.success?
47 @ep.wait(nil, 0) { |flags,obj| tmp << [ flags, obj ] }
48 assert_equal [[Epoll::IN, @rd]], tmp
54 assert ! epdup.closed?
56 exit(!epdup.closed? && @ep.closed?)
58 _, status = Process.waitpid2(pid)
59 assert status.success?, status.inspect
64 def test_after_fork_usability
65 fork { @ep.add(@rd, Epoll::IN); exit!(0) }
66 fork { @ep.set(@rd, Epoll::IN); exit!(0) }
67 fork { @ep.to_io; exit!(0) }
68 fork { @ep.dup; exit!(0) }
69 fork { @ep.clone; exit!(0) }
70 fork { @ep.close; exit!(0) }
71 fork { @ep.closed?; exit!(0) }
81 res.each { |(_,status)| assert status.success? }
84 def test_tcp_connect_nonblock_edge
85 epflags = Epoll::OUT | Epoll::ET
87 srv = TCPServer.new(host, 0)
89 addr = Socket.pack_sockaddr_in(port, host)
90 sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
91 assert_raises(Errno::EINPROGRESS) { sock.connect_nonblock(addr) }
92 IO.select(nil, [ sock ], [sock ])
93 @ep.add(sock, epflags)
95 @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
96 assert_equal [ [Epoll::OUT, sock] ], tmp
99 def test_tcp_connect_edge
100 epflags = Epoll::OUT | Epoll::ET
102 srv = TCPServer.new(host, 0)
104 sock = TCPSocket.new(host, port)
105 @ep.add(sock, epflags)
107 @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
108 assert_equal [ [Epoll::OUT, sock] ], tmp
113 srv = TCPServer.new(host, 0)
115 sock = TCPSocket.new(host, port)
117 assert_equal 3, asock.syswrite("HI\n")
118 @ep.add(asock, Epoll::OUT| Epoll::ET | Epoll::ONESHOT)
120 @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
121 assert_equal [ [Epoll::OUT, asock] ], tmp
125 assert_nothing_raised do
126 @rd.close unless @rd.closed?
127 @wr.close unless @wr.closed?
128 @ep.close unless @ep.closed?
132 def test_max_events_big
133 @ep.add @rd, Epoll::IN
135 thr = Thread.new { @ep.wait(1024) { |flags, obj| tmp << [ flags, obj ] } }
140 assert_equal([[Epoll::IN, @rd]], tmp)
142 thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
144 assert_equal([[Epoll::IN, @rd]], tmp)
147 def test_max_events_small
148 @ep.add @rd, Epoll::IN | Epoll::ET
149 @ep.add @wr, Epoll::OUT | Epoll::ET
152 @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
153 assert_equal 1, tmp.size
154 @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
155 assert_equal 2, tmp.size
158 def test_signal_safe_wait_forever
161 time[:USR1] = Time.now
165 @ep.add @rd, Epoll::IN
168 sleep 0.5 # slightly racy :<
169 Process.kill(:USR1, Process.ppid)
172 time[:START_WAIT] = Time.now
173 assert_nothing_raised do
174 @ep.wait do |flags, obj|
175 tmp << [ flags, obj ]
179 assert_equal([[Epoll::IN, @rd]], tmp)
180 _, status = Process.waitpid2(pid)
181 assert status.success?, status.inspect
182 usr1_delay = time[:USR1] - time[:START_WAIT]
183 assert_in_delta(0.5, usr1_delay, 0.1, "usr1_delay=#{usr1_delay}")
184 ep_delay = time[:EP] - time[:USR1]
185 assert_in_delta(0.5, ep_delay, 0.1, "ep1_delay=#{ep_delay}")
187 trap(:USR1, 'DEFAULT')
191 @ep.add @rd, Epoll::IN
193 thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
196 assert_nil thr.join(0.01)
203 defined?(Epoll::RDHUP) or
204 return warn "skipping test, EPOLLRDHUP not available"
205 rd, wr = UNIXSocket.pair
206 @ep.add rd, Epoll::RDHUP
208 thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
209 wr.shutdown Socket::SHUT_WR
211 assert_equal([[ Epoll::RDHUP, rd ]], tmp)
215 @ep.add @rd, Epoll::IN
217 thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
220 assert_equal([[ Epoll::HUP, @rd ]], tmp)
225 assert_nothing_raised do
227 @ep.add @rd, Epoll::IN
228 @ep.add w, Epoll::OUT
229 @ep.add @wr, Epoll::OUT
232 @ep.wait { |flags, obj| tmp << [ flags, obj ] }
233 assert_equal 2, tmp.size
234 assert_equal [ Epoll::OUT ], tmp.map { |flags, obj| flags }.uniq
235 ios = tmp.map { |flags, obj| obj }
236 assert ios.include?(@wr)
237 assert ios.include?(w)
241 assert_nothing_raised { 4096.times { Epoll.new } }
246 assert_nothing_raised do
249 assert_kind_of IO, ep.to_io
258 assert @ep.to_io.fileno != clone.to_io.fileno
259 clone.add @wr, Epoll::OUT
260 @ep.wait(nil, 0) { |flags, obj| tmp << [ flags, obj ] }
261 assert_equal([[Epoll::OUT, @wr]], tmp)
262 assert_nothing_raised { clone.close }
268 assert @ep.to_io.fileno != clone.to_io.fileno
269 clone.add @wr, Epoll::OUT
270 @ep.wait(nil, 0) { |flags, obj| tmp << [ flags, obj ] }
271 assert_equal([[Epoll::OUT, @wr]], tmp)
272 assert_nothing_raised { clone.close }
275 def test_set_idempotency
276 assert_nothing_raised do
277 @ep.set @rd, Epoll::IN
278 @ep.set @rd, Epoll::IN
279 @ep.set @wr, Epoll::OUT
280 @ep.set @wr, Epoll::OUT
284 def test_wait_timeout
286 assert_equal 0, @ep.wait(nil, 100) { |flags,obj| assert false }
288 assert(diff >= 0.075, "#{diff} < 0.100s")
292 assert_raises(Errno::ENOENT) { @ep.del(@rd) }
293 assert_nothing_raised do
294 @ep.add(@rd, Epoll::IN)
300 @ep.add(@rd, Epoll::IN)
301 assert_equal 0, @ep.wait(nil, 0) { |flags,obj| assert false }
304 nr = @ep.wait(nil, 0) do |flags,obj|
305 assert_equal Epoll::IN, flags
306 assert_equal obj, @rd
314 @ep.add(@wr, Epoll::OUT | Epoll::IN)
316 nr = @ep.wait(nil, 0) do |flags, obj|
317 assert_equal Epoll::OUT, flags
318 assert_equal obj, @wr
325 def test_wait_write_blocked
327 @wr.write_nonblock('.' * 65536)
331 @ep.add(@wr, Epoll::OUT | Epoll::IN)
332 assert_equal 0, @ep.wait(nil, 0) { |flags,event| assert false }
337 @ep.add @rd, Epoll::IN
338 thr = Thread.new { tmp = IO.select([ @ep ]) }
343 assert_equal([[@ep],[],[]], tmp)
346 def test_new_no_cloexec
348 io = Epoll.new(0).to_io
349 assert((io.fcntl(Fcntl::F_GETFD) & Fcntl::FD_CLOEXEC) == 0)
354 io = Epoll.new(Epoll::CLOEXEC).to_io
355 assert((io.fcntl(Fcntl::F_GETFD) & Fcntl::FD_CLOEXEC) == Fcntl::FD_CLOEXEC)
361 if RUBY_VERSION.to_f >= 2.0
362 assert_equal 1, io.fcntl(Fcntl::F_GETFD)
364 assert_equal 0, io.fcntl(Fcntl::F_GETFD)
369 assert_nil @ep.delete(@rd)
370 assert_nil @ep.delete(@wr)
371 assert_nothing_raised { @ep.add @rd, Epoll::IN }
372 assert_equal @rd, @ep.delete(@rd)
373 assert_nil @ep.delete(@rd)
377 @ep.add @rd, Epoll::IN
378 assert_equal @rd, @ep.io_for(@rd.fileno)
379 assert_equal @rd, @ep.io_for(@rd)
381 assert_nil @ep.io_for(@rd.fileno)
382 assert_nil @ep.io_for(@rd)
386 @ep.add @rd, Epoll::IN
387 assert_equal Epoll::IN, @ep.flags_for(@rd.fileno)
388 assert_equal Epoll::IN, @ep.flags_for(@rd)
391 assert_nil @ep.flags_for(@rd.fileno)
392 assert_nil @ep.flags_for(@rd)
395 def test_flags_for_sym
397 assert_equal Epoll::IN, @ep.flags_for(@rd.fileno)
398 assert_equal Epoll::IN, @ep.flags_for(@rd)
401 assert_nil @ep.flags_for(@rd.fileno)
402 assert_nil @ep.flags_for(@rd)
405 def test_flags_for_sym_ary
406 @ep.add @rd, [:IN, :ET]
407 expect = Epoll::IN | Epoll::ET
408 assert_equal expect, @ep.flags_for(@rd.fileno)
409 assert_equal expect, @ep.flags_for(@rd)
412 assert_nil @ep.flags_for(@rd.fileno)
413 assert_nil @ep.flags_for(@rd)
417 assert ! @ep.include?(@rd)
418 @ep.add @rd, Epoll::IN
419 assert @ep.include?(@rd), @ep.instance_variable_get(:@marks).inspect
420 assert @ep.include?(@rd.fileno)
421 assert ! @ep.include?(@wr)
422 assert ! @ep.include?(@wr.fileno)
425 def test_cross_thread_close
427 thr = Thread.new { sleep(1); @ep.close }
428 assert_raises(IOError) do
429 @ep.wait { |flags, obj| tmp << [ flags, obj ] }
432 end if RUBY_VERSION == "1.9.3"
434 def test_epoll_level_trigger
435 @ep.add(@wr, Epoll::OUT)
438 @ep.wait { |flags, obj| tmp = obj }
439 assert_equal @wr, tmp
442 @ep.wait { |flags, obj| tmp = obj }
443 assert_equal @wr, tmp
447 @wr.write_nonblock(buf)
454 @ep.wait { |flags, obj| tmp = obj }
455 assert_equal @wr, tmp
458 def test_epoll_wait_signal_torture
462 @ep.add(@rd, Epoll::IN)
464 trap(:USR1) { usr1 += 1 }
466 trap(:USR1, "DEFAULT")
469 nr.times { Process.kill(:USR1, ppid); sleep 0.01 }
474 assert_nothing_raised { @ep.wait(nil, 100) { |flags,obj| tmp << obj } }
477 _, status = Process.waitpid2(pid)
478 assert status.success?, status.inspect
479 assert usr1 > 0, "usr1: #{usr1}"
481 trap(:USR1, "DEFAULT")
482 end if ENV["STRESS"].to_i != 0
484 def test_wait_one_event_per_thread
493 lock.synchronize { pipes[r] = w }
494 @ep.add(r, Epoll::IN | Epoll::ET | Epoll::ONESHOT)
499 @ep.wait(maxevents) do |_,obj|
501 assert pipes.include?(obj), "#{obj.inspect} is unknown"
511 pipes.each_value { |w| w.syswrite '.' }
520 thr.each { |t| assert_equal 1, t.value }
521 assert_equal nr, ok.size, ok.inspect
522 assert_equal ok.size, ok.uniq.size, ok.inspect
523 assert_equal ok.map { |io| io.fileno }.sort,
524 pipes.keys.map { |io| io.fileno }.sort
532 def test_epoll_as_queue
533 fl = Epoll::OUT | Epoll::ET
539 @ep.add(r, Epoll::IN | Epoll::ET)
544 @ep.wait(1) { |flags, io| first[1].write('.') if i == 0 }
546 @ep.wait(1) { |flags, io| assert_equal(first[0], io) }
548 end if defined?(SleepyPenguin::Epoll)