preliminary kqueue support
[sleepy_penguin.git] / test / test_epoll.rb
blob7c648ffe6f1d8aa395c2f2cecd2c5c7cc637e9c4
1 require 'test/unit'
2 require 'fcntl'
3 require 'socket'
4 require 'thread'
5 $-w = true
6 Thread.abort_on_exception = true
8 require 'sleepy_penguin'
10 class TestEpoll < Test::Unit::TestCase
11   include SleepyPenguin
12   RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
14   def setup
15     @rd, @wr = IO.pipe
16     @ep = Epoll.new
17   end
19   def test_constants
20     Epoll.constants.each do |const|
21       next if const.to_sym == :IO
22       nr = Epoll.const_get(const)
23       assert nr <= 0xffffffff, "#{const}=#{nr}"
24     end
25   end
27   def test_cross_thread
28     tmp = []
29     t0 = Time.now
30     Thread.new { sleep 0.100; @ep.add(@wr, Epoll::OUT) }
31     @ep.wait { |flags,obj| tmp << [ flags, obj ] }
32     elapsed = Time.now - t0
33     assert elapsed >= 0.100
34     assert_equal [[Epoll::OUT, @wr]], tmp, tmp.inspect
35   end
37   def test_fork_safe
38     tmp = []
39     @ep.add @rd, Epoll::IN
40     pid = fork do
41       @ep.wait(nil, 100) { |flags,obj| tmp << [ flags, obj ] }
42       exit!(tmp.empty?)
43     end
44     @wr.syswrite "HI"
45     _, status = Process.waitpid2(pid)
46     assert status.success?
47     @ep.wait(nil, 0) { |flags,obj| tmp << [ flags, obj ] }
48     assert_equal [[Epoll::IN, @rd]], tmp
49   end
51   def test_dup_and_fork
52     epdup = @ep.dup
53     @ep.close
54     assert ! epdup.closed?
55     pid = fork do
56       exit(!epdup.closed? && @ep.closed?)
57     end
58     _, status = Process.waitpid2(pid)
59     assert status.success?, status.inspect
60   ensure
61     epdup.close
62   end
64   def test_after_fork_usability
65     fork { @ep.add(@rd, Epoll::IN); exit!(0) }
66     fork { @ep.set(@rd, Epoll::IN); exit!(0) }
67     fork { @ep.to_io; exit!(0) }
68     fork { @ep.dup; exit!(0) }
69     fork { @ep.clone; exit!(0) }
70     fork { @ep.close; exit!(0) }
71     fork { @ep.closed?; exit!(0) }
72     fork {
73       begin
74         @ep.del(@rd)
75       rescue Errno::ENOENT
76         exit!(0)
77       end
78       exit!(1)
79     }
80     res = Process.waitall
81     res.each { |(_,status)| assert status.success? }
82   end
84   def test_tcp_connect_nonblock_edge
85     epflags = Epoll::OUT | Epoll::ET
86     host = '127.0.0.1'
87     srv = TCPServer.new(host, 0)
88     port = srv.addr[1]
89     addr = Socket.pack_sockaddr_in(port, host)
90     sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
91     assert_raises(Errno::EINPROGRESS) { sock.connect_nonblock(addr) }
92     IO.select(nil, [ sock ], [sock ])
93     @ep.add(sock, epflags)
94     tmp = []
95     @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
96     assert_equal [ [Epoll::OUT,  sock] ], tmp
97   end
99   def test_tcp_connect_edge
100     epflags = Epoll::OUT | Epoll::ET
101     host = '127.0.0.1'
102     srv = TCPServer.new(host, 0)
103     port = srv.addr[1]
104     sock = TCPSocket.new(host, port)
105     @ep.add(sock, epflags)
106     tmp = []
107     @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
108     assert_equal [ [Epoll::OUT,  sock] ], tmp
109   end
111   def test_edge_accept
112     host = '127.0.0.1'
113     srv = TCPServer.new(host, 0)
114     port = srv.addr[1]
115     sock = TCPSocket.new(host, port)
116     asock = srv.accept
117     assert_equal 3, asock.syswrite("HI\n")
118     @ep.add(asock, Epoll::OUT| Epoll::ET | Epoll::ONESHOT)
119     tmp = []
120     @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
121     assert_equal [ [Epoll::OUT,  asock] ], tmp
122   end
124   def teardown
125     assert_nothing_raised do
126       @rd.close unless @rd.closed?
127       @wr.close unless @wr.closed?
128       @ep.close unless @ep.closed?
129     end
130   end
132   def test_max_events_big
133     @ep.add @rd, Epoll::IN
134     tmp = []
135     thr = Thread.new { @ep.wait(1024) { |flags, obj| tmp << [ flags, obj ] } }
136     Thread.pass
137     assert tmp.empty?
138     @wr.write '.'
139     thr.join
140     assert_equal([[Epoll::IN, @rd]], tmp)
141     tmp.clear
142     thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
143     thr.join
144     assert_equal([[Epoll::IN, @rd]], tmp)
145   end
147   def test_max_events_small
148     @ep.add @rd, Epoll::IN | Epoll::ET
149     @ep.add @wr, Epoll::OUT | Epoll::ET
150     @wr.write '.'
151     tmp = []
152     @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
153     assert_equal 1, tmp.size
154     @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
155     assert_equal 2, tmp.size
156   end
158   def test_signal_safe_wait_forever
159     time = {}
160     trap(:USR1) do
161       time[:USR1] = Time.now
162       sleep 0.5
163       @wr.write '.'
164     end
165     @ep.add @rd, Epoll::IN
166     tmp = []
167     pid = fork do
168       sleep 0.5 # slightly racy :<
169       Process.kill(:USR1, Process.ppid)
170       exit!(0)
171     end
172     time[:START_WAIT] = Time.now
173     assert_nothing_raised do
174       @ep.wait do |flags, obj|
175         tmp << [ flags, obj ]
176         time[:EP] = Time.now
177       end
178     end
179     assert_equal([[Epoll::IN, @rd]], tmp)
180     _, status = Process.waitpid2(pid)
181     assert status.success?, status.inspect
182     usr1_delay = time[:USR1] - time[:START_WAIT]
183     assert_in_delta(0.5, usr1_delay, 0.1, "usr1_delay=#{usr1_delay}")
184     ep_delay = time[:EP] - time[:USR1]
185     assert_in_delta(0.5, ep_delay, 0.1, "ep1_delay=#{ep_delay}")
186     ensure
187       trap(:USR1, 'DEFAULT')
188   end unless RBX
190   def test_close
191     @ep.add @rd, Epoll::IN
192     tmp = []
193     thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
194     @rd.close
195     @wr.close
196     assert_nil thr.join(0.01)
197     assert thr.alive?
198     thr.kill
199     assert tmp.empty?
200   end
202   def test_rdhup
203     defined?(Epoll::RDHUP) or
204       return warn "skipping test, EPOLLRDHUP not available"
205     rd, wr = UNIXSocket.pair
206     @ep.add rd, Epoll::RDHUP
207     tmp = []
208     thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
209     wr.shutdown Socket::SHUT_WR
210     thr.join
211     assert_equal([[ Epoll::RDHUP, rd ]], tmp)
212   end
214   def test_hup
215     @ep.add @rd, Epoll::IN
216     tmp = []
217     thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
218     @wr.close
219     thr.join
220     assert_equal([[ Epoll::HUP, @rd ]], tmp)
221   end
223   def test_multiple
224     r, w = IO.pipe
225     assert_nothing_raised do
226       @ep.add r, Epoll::IN
227       @ep.add @rd, Epoll::IN
228       @ep.add w, Epoll::OUT
229       @ep.add @wr, Epoll::OUT
230     end
231     tmp = []
232     @ep.wait { |flags, obj| tmp << [ flags, obj ] }
233     assert_equal 2, tmp.size
234     assert_equal [ Epoll::OUT ], tmp.map { |flags, obj| flags }.uniq
235     ios = tmp.map { |flags, obj| obj }
236     assert ios.include?(@wr)
237     assert ios.include?(w)
238   end
240   def test_gc
241     assert_nothing_raised { 4096.times { Epoll.new } }
242     assert ! @ep.closed?
243   end unless RBX
245   def test_gc_to_io
246     assert_nothing_raised do
247       4096.times do
248         ep = Epoll.new
249         assert_kind_of IO, ep.to_io
250       end
251     end
252     assert ! @ep.closed?
253   end unless RBX
255   def test_clone
256     tmp = []
257     clone = @ep.clone
258     assert @ep.to_io.fileno != clone.to_io.fileno
259     clone.add @wr, Epoll::OUT
260     @ep.wait(nil, 0) { |flags, obj| tmp << [ flags, obj ] }
261     assert_equal([[Epoll::OUT, @wr]], tmp)
262     assert_nothing_raised { clone.close }
263   end
265   def test_dup
266     tmp = []
267     clone = @ep.dup
268     assert @ep.to_io.fileno != clone.to_io.fileno
269     clone.add @wr, Epoll::OUT
270     @ep.wait(nil, 0) { |flags, obj| tmp << [ flags, obj ] }
271     assert_equal([[Epoll::OUT, @wr]], tmp)
272     assert_nothing_raised { clone.close }
273   end
275   def test_set_idempotency
276     assert_nothing_raised do
277       @ep.set @rd, Epoll::IN
278       @ep.set @rd, Epoll::IN
279       @ep.set @wr, Epoll::OUT
280       @ep.set @wr, Epoll::OUT
281     end
282   end
284   def test_wait_timeout
285     t0 = Time.now
286     assert_equal 0, @ep.wait(nil, 100) { |flags,obj| assert false }
287     diff = Time.now - t0
288     assert(diff >= 0.075, "#{diff} < 0.100s")
289   end
291   def test_del
292     assert_raises(Errno::ENOENT) { @ep.del(@rd) }
293     assert_nothing_raised do
294       @ep.add(@rd, Epoll::IN)
295       @ep.del(@rd)
296     end
297   end
299   def test_wait_read
300     @ep.add(@rd, Epoll::IN)
301     assert_equal 0, @ep.wait(nil, 0) { |flags,obj| assert false }
302     @wr.syswrite '.'
303     i = 0
304     nr = @ep.wait(nil, 0) do |flags,obj|
305       assert_equal Epoll::IN, flags
306       assert_equal obj, @rd
307       i += 1
308     end
309     assert_equal 1, i
310     assert_equal 1, nr
311   end
313   def test_wait_write
314     @ep.add(@wr, Epoll::OUT | Epoll::IN)
315     i = 0
316     nr = @ep.wait(nil, 0) do |flags, obj|
317       assert_equal Epoll::OUT, flags
318       assert_equal obj, @wr
319       i += 1
320     end
321     assert_equal 1, nr
322     assert_equal 1, i
323   end
325   def test_wait_write_blocked
326     begin
327       @wr.write_nonblock('.' * 65536)
328     rescue Errno::EAGAIN
329       break
330     end while true
331     @ep.add(@wr, Epoll::OUT | Epoll::IN)
332     assert_equal 0, @ep.wait(nil, 0) { |flags,event| assert false }
333   end
335   def test_selectable
336     tmp = nil
337     @ep.add @rd, Epoll::IN
338     thr = Thread.new { tmp = IO.select([ @ep ]) }
339     thr.join 0.01
340     assert_nil tmp
341     @wr.write '.'
342     thr.join
343     assert_equal([[@ep],[],[]], tmp)
344   end
346   def test_new_no_cloexec
347     @ep.close
348     io = Epoll.new(0).to_io
349     assert((io.fcntl(Fcntl::F_GETFD) & Fcntl::FD_CLOEXEC) == 0)
350   end
352   def test_new_cloexec
353     @ep.close
354     io = Epoll.new(Epoll::CLOEXEC).to_io
355     assert((io.fcntl(Fcntl::F_GETFD) & Fcntl::FD_CLOEXEC) == Fcntl::FD_CLOEXEC)
356   end
358   def test_new
359     @ep.close
360     io = Epoll.new.to_io
361     if RUBY_VERSION.to_f >= 2.0
362       assert_equal 1, io.fcntl(Fcntl::F_GETFD)
363     else
364       assert_equal 0, io.fcntl(Fcntl::F_GETFD)
365     end
366   end
368   def test_delete
369     assert_nil @ep.delete(@rd)
370     assert_nil @ep.delete(@wr)
371     assert_nothing_raised { @ep.add @rd, Epoll::IN }
372     assert_equal @rd, @ep.delete(@rd)
373     assert_nil @ep.delete(@rd)
374   end
376   def test_io_for
377     @ep.add @rd, Epoll::IN
378     assert_equal @rd, @ep.io_for(@rd.fileno)
379     assert_equal @rd, @ep.io_for(@rd)
380     @ep.del @rd
381     assert_nil @ep.io_for(@rd.fileno)
382     assert_nil @ep.io_for(@rd)
383   end
385   def test_flags_for
386     @ep.add @rd, Epoll::IN
387     assert_equal Epoll::IN, @ep.flags_for(@rd.fileno)
388     assert_equal Epoll::IN, @ep.flags_for(@rd)
390     @ep.del @rd
391     assert_nil @ep.flags_for(@rd.fileno)
392     assert_nil @ep.flags_for(@rd)
393   end
395   def test_flags_for_sym
396     @ep.add @rd, :IN
397     assert_equal Epoll::IN, @ep.flags_for(@rd.fileno)
398     assert_equal Epoll::IN, @ep.flags_for(@rd)
400     @ep.del @rd
401     assert_nil @ep.flags_for(@rd.fileno)
402     assert_nil @ep.flags_for(@rd)
403   end
405   def test_flags_for_sym_ary
406     @ep.add @rd, [:IN, :ET]
407     expect = Epoll::IN | Epoll::ET
408     assert_equal expect, @ep.flags_for(@rd.fileno)
409     assert_equal expect, @ep.flags_for(@rd)
411     @ep.del @rd
412     assert_nil @ep.flags_for(@rd.fileno)
413     assert_nil @ep.flags_for(@rd)
414   end
416   def test_include?
417     assert ! @ep.include?(@rd)
418     @ep.add @rd, Epoll::IN
419     assert @ep.include?(@rd), @ep.instance_variable_get(:@marks).inspect
420     assert @ep.include?(@rd.fileno)
421     assert ! @ep.include?(@wr)
422     assert ! @ep.include?(@wr.fileno)
423   end
425   def test_cross_thread_close
426     tmp = []
427     thr = Thread.new { sleep(1); @ep.close }
428     assert_raises(IOError) do
429       @ep.wait { |flags, obj| tmp << [ flags, obj ] }
430     end
431     assert_nil thr.value
432   end if RUBY_VERSION == "1.9.3"
434   def test_epoll_level_trigger
435     @ep.add(@wr, Epoll::OUT)
437     tmp = nil
438     @ep.wait { |flags, obj| tmp = obj }
439     assert_equal @wr, tmp
441     tmp = nil
442     @ep.wait { |flags, obj| tmp = obj }
443     assert_equal @wr, tmp
445     buf = '.' * 16384
446     begin
447       @wr.write_nonblock(buf)
448     rescue Errno::EAGAIN
449       break
450     end while true
451     @rd.read(16384)
453     tmp = nil
454     @ep.wait { |flags, obj| tmp = obj }
455     assert_equal @wr, tmp
456   end
458   def test_epoll_wait_signal_torture
459     usr1 = 0
460     empty = 0
461     nr = 100
462     @ep.add(@rd, Epoll::IN)
463     tmp = []
464     trap(:USR1) { usr1 += 1 }
465     pid = fork do
466       trap(:USR1, "DEFAULT")
467       sleep 0.1
468       ppid = Process.ppid
469       nr.times { Process.kill(:USR1, ppid); sleep 0.01 }
470       @wr.syswrite('.')
471       exit!(0)
472     end
473     while tmp.empty?
474       assert_nothing_raised { @ep.wait(nil, 100) { |flags,obj| tmp << obj } }
475       empty += 1
476     end
477     _, status = Process.waitpid2(pid)
478     assert status.success?, status.inspect
479     assert usr1 > 0, "usr1: #{usr1}"
480     ensure
481       trap(:USR1, "DEFAULT")
482   end if ENV["STRESS"].to_i != 0
484   def test_wait_one_event_per_thread
485     thr = []
486     pipes = {}
487     lock = Mutex.new
488     maxevents = 1
489     ok = []
490     nr = 10
491     nr.times do
492       r, w = IO.pipe
493       lock.synchronize { pipes[r] = w }
494       @ep.add(r, Epoll::IN | Epoll::ET | Epoll::ONESHOT)
496       t = Thread.new do
497         sleep 2
498         events = 0
499         @ep.wait(maxevents) do |_,obj|
500           lock.synchronize do
501             assert pipes.include?(obj), "#{obj.inspect} is unknown"
502             ok << obj
503           end
504           events += 1
505         end
506         events
507       end
508       thr << t
509     end
510     lock.synchronize do
511       pipes.each_value { |w| w.syswrite '.' }
512     end
513     thr.each do |t|
514       begin
515         t.run
516       rescue ThreadError
517       end
518     end
520     thr.each { |t| assert_equal 1, t.value }
521     assert_equal nr, ok.size, ok.inspect
522     assert_equal ok.size, ok.uniq.size, ok.inspect
523     assert_equal ok.map { |io| io.fileno }.sort,
524                  pipes.keys.map { |io| io.fileno }.sort
525   ensure
526     pipes.each do |r,w|
527       r.close
528       w.close
529     end
530   end
532   def test_epoll_as_queue
533     fl = Epoll::OUT | Epoll::ET
534     first = nil
535     500.times do
536       r, w = IO.pipe
537       @ep.add(w, fl)
538       first ||= begin
539         @ep.add(r, Epoll::IN | Epoll::ET)
540         [ r, w ]
541       end
542     end
543     500.times do |i|
544       @ep.wait(1) { |flags, io| first[1].write('.') if i == 0 }
545     end
546     @ep.wait(1) { |flags, io| assert_equal(first[0], io) }
547   end
548 end if defined?(SleepyPenguin::Epoll)