epoll: avoid race condition in EINTR checking
[sleepy_penguin.git] / test / test_epoll.rb
blobd6f319f2b4b8adb0469d9a33c0616158b1a554bb
1 require 'test/unit'
2 require 'fcntl'
3 require 'socket'
4 $-w = true
6 require 'sleepy_penguin'
8 class TestEpoll < Test::Unit::TestCase
9   include SleepyPenguin
10   RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
12   def setup
13     @rd, @wr = IO.pipe
14     @ep = Epoll.new
15   end
17   def test_constants
18     Epoll.constants.each do |const|
19       next if const.to_sym == :IO
20       nr = Epoll.const_get(const)
21       assert nr <= 0xffffffff, "#{const}=#{nr}"
22     end
23   end
25   def test_cross_thread
26     tmp = []
27     Thread.new { sleep 0.100; @ep.add(@wr, Epoll::OUT) }
28     t0 = Time.now
29     @ep.wait { |flags,obj| tmp << [ flags, obj ] }
30     elapsed = Time.now - t0
31     assert elapsed >= 0.100
32     assert_equal [[Epoll::OUT, @wr]], tmp
33   end
35   def test_fork_safe
36     tmp = []
37     @ep.add @rd, Epoll::IN
38     pid = fork do
39       @ep.wait(nil, 100) { |flags,obj| tmp << [ flags, obj ] }
40       exit!(tmp.empty?)
41     end
42     @wr.syswrite "HI"
43     _, status = Process.waitpid2(pid)
44     assert status.success?
45     @ep.wait(nil, 0) { |flags,obj| tmp << [ flags, obj ] }
46     assert_equal [[Epoll::IN, @rd]], tmp
47   end
49   def test_after_fork_usability
50     fork { @ep.add(@rd, Epoll::IN); exit!(0) }
51     fork { @ep.set(@rd, Epoll::IN); exit!(0) }
52     fork { @ep.to_io; exit!(0) }
53     fork { @ep.dup; exit!(0) }
54     fork { @ep.clone; exit!(0) }
55     fork { @ep.close; exit!(0) }
56     fork { @ep.closed?; exit!(0) }
57     fork {
58       begin
59         @ep.del(@rd)
60       rescue Errno::ENOENT
61         exit!(0)
62       end
63       exit!(1)
64     }
65     res = Process.waitall
66     res.each { |(_,status)| assert status.success? }
67   end
69   def test_tcp_connect_nonblock_edge
70     epflags = Epoll::OUT | Epoll::ET
71     host = '127.0.0.1'
72     srv = TCPServer.new(host, 0)
73     port = srv.addr[1]
74     addr = Socket.pack_sockaddr_in(port, host)
75     sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
76     assert_raises(Errno::EINPROGRESS) { sock.connect_nonblock(addr) }
77     IO.select(nil, [ sock ], [sock ])
78     @ep.add(sock, epflags)
79     tmp = []
80     @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
81     assert_equal [ [Epoll::OUT,  sock] ], tmp
82   end
84   def test_tcp_connect_edge
85     epflags = Epoll::OUT | Epoll::ET
86     host = '127.0.0.1'
87     srv = TCPServer.new(host, 0)
88     port = srv.addr[1]
89     sock = TCPSocket.new(host, port)
90     @ep.add(sock, epflags)
91     tmp = []
92     @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
93     assert_equal [ [Epoll::OUT,  sock] ], tmp
94   end
96   def teardown
97     assert_nothing_raised do
98       @rd.close unless @rd.closed?
99       @wr.close unless @wr.closed?
100       @ep.close unless @ep.closed?
101     end
102   end
104   def test_max_events_big
105     @ep.add @rd, Epoll::IN
106     tmp = []
107     thr = Thread.new { @ep.wait(1024) { |flags, obj| tmp << [ flags, obj ] } }
108     Thread.pass
109     assert tmp.empty?
110     @wr.write '.'
111     thr.join
112     assert_equal([[Epoll::IN, @rd]], tmp)
113     tmp.clear
114     thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
115     thr.join
116     assert_equal([[Epoll::IN, @rd]], tmp)
117   end
119   def test_max_events_small
120     @ep.add @rd, Epoll::IN | Epoll::ET
121     @ep.add @wr, Epoll::OUT | Epoll::ET
122     @wr.write '.'
123     tmp = []
124     @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
125     assert_equal 1, tmp.size
126     @ep.wait(1) { |flags, obj| tmp << [ flags, obj ] }
127     assert_equal 2, tmp.size
128   end
130   def test_signal_safe_wait_forever
131     time = {}
132     trap(:USR1) do
133       time[:USR1] = Time.now
134       sleep 0.5
135       @wr.write '.'
136     end
137     @ep.add @rd, Epoll::IN
138     tmp = []
139     pid = fork do
140       sleep 0.5 # slightly racy :<
141       Process.kill(:USR1, Process.ppid)
142       exit!(0)
143     end
144     time[:START_WAIT] = Time.now
145     assert_nothing_raised do
146       @ep.wait do |flags, obj|
147         tmp << [ flags, obj ]
148         time[:EP] = Time.now
149       end
150     end
151     assert_equal([[Epoll::IN, @rd]], tmp)
152     _, status = Process.waitpid2(pid)
153     assert status.success?, status.inspect
154     usr1_delay = time[:USR1] - time[:START_WAIT]
155     assert_in_delta(0.5, usr1_delay, 0.1, "usr1_delay=#{usr1_delay}")
156     ep_delay = time[:EP] - time[:USR1]
157     assert_in_delta(0.5, ep_delay, 0.1, "ep1_delay=#{ep_delay}")
158     ensure
159       trap(:USR1, 'DEFAULT')
160   end unless RBX
162   def test_close
163     @ep.add @rd, Epoll::IN
164     tmp = []
165     thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
166     @rd.close
167     @wr.close
168     assert_nil thr.join(0.01)
169     assert thr.alive?
170     thr.kill
171     assert tmp.empty?
172   end
174   def test_rdhup
175     defined?(Epoll::RDHUP) or
176       return warn "skipping test, EPOLLRDHUP not available"
177     rd, wr = UNIXSocket.pair
178     @ep.add rd, Epoll::RDHUP
179     tmp = []
180     thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
181     wr.shutdown Socket::SHUT_WR
182     thr.join
183     assert_equal([[ Epoll::RDHUP, rd ]], tmp)
184   end
186   def test_hup
187     @ep.add @rd, Epoll::IN
188     tmp = []
189     thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
190     @wr.close
191     thr.join
192     assert_equal([[ Epoll::HUP, @rd ]], tmp)
193   end
195   def test_multiple
196     r, w = IO.pipe
197     assert_nothing_raised do
198       @ep.add r, Epoll::IN
199       @ep.add @rd, Epoll::IN
200       @ep.add w, Epoll::OUT
201       @ep.add @wr, Epoll::OUT
202     end
203     tmp = []
204     @ep.wait { |flags, obj| tmp << [ flags, obj ] }
205     assert_equal 2, tmp.size
206     assert_equal [ Epoll::OUT ], tmp.map { |flags, obj| flags }.uniq
207     ios = tmp.map { |flags, obj| obj }
208     assert ios.include?(@wr)
209     assert ios.include?(w)
210   end
212   def test_gc
213     assert_nothing_raised { 4096.times { Epoll.new } }
214     assert ! @ep.closed?
215   end unless RBX
217   def test_gc_to_io
218     assert_nothing_raised do
219       4096.times do
220         ep = Epoll.new
221         assert_kind_of IO, ep.to_io
222       end
223     end
224     assert ! @ep.closed?
225   end unless RBX
227   def test_clone
228     tmp = []
229     clone = @ep.clone
230     assert @ep.to_io.fileno != clone.to_io.fileno
231     clone.add @wr, Epoll::OUT
232     @ep.wait(nil, 0) { |flags, obj| tmp << [ flags, obj ] }
233     assert_equal([[Epoll::OUT, @wr]], tmp)
234     assert_nothing_raised { clone.close }
235   end
237   def test_dup
238     tmp = []
239     clone = @ep.dup
240     assert @ep.to_io.fileno != clone.to_io.fileno
241     clone.add @wr, Epoll::OUT
242     @ep.wait(nil, 0) { |flags, obj| tmp << [ flags, obj ] }
243     assert_equal([[Epoll::OUT, @wr]], tmp)
244     assert_nothing_raised { clone.close }
245   end
247   def test_set_idempotency
248     assert_nothing_raised do
249       @ep.set @rd, Epoll::IN
250       @ep.set @rd, Epoll::IN
251       @ep.set @wr, Epoll::OUT
252       @ep.set @wr, Epoll::OUT
253     end
254   end
256   def test_wait_timeout
257     t0 = Time.now
258     assert_equal 0, @ep.wait(nil, 100) { |flags,obj| assert false }
259     diff = Time.now - t0
260     assert(diff >= 0.075, "#{diff} < 0.100s")
261   end
263   def test_del
264     assert_raises(Errno::ENOENT) { @ep.del(@rd) }
265     assert_nothing_raised do
266       @ep.add(@rd, Epoll::IN)
267       @ep.del(@rd)
268     end
269   end
271   def test_wait_read
272     @ep.add(@rd, Epoll::IN)
273     assert_equal 0, @ep.wait(nil, 0) { |flags,obj| assert false }
274     @wr.syswrite '.'
275     i = 0
276     nr = @ep.wait(nil, 0) do |flags,obj|
277       assert_equal Epoll::IN, flags
278       assert_equal obj, @rd
279       i += 1
280     end
281     assert_equal 1, i
282     assert_equal 1, nr
283   end
285   def test_wait_write
286     @ep.add(@wr, Epoll::OUT | Epoll::IN)
287     i = 0
288     nr = @ep.wait(nil, 0) do |flags, obj|
289       assert_equal Epoll::OUT, flags
290       assert_equal obj, @wr
291       i += 1
292     end
293     assert_equal 1, nr
294     assert_equal 1, i
295   end
297   def test_wait_write_blocked
298     begin
299       @wr.write_nonblock('.' * 65536)
300     rescue Errno::EAGAIN
301       break
302     end while true
303     @ep.add(@wr, Epoll::OUT | Epoll::IN)
304     assert_equal 0, @ep.wait(nil, 0) { |flags,event| assert false }
305   end
307   def test_selectable
308     tmp = nil
309     @ep.add @rd, Epoll::IN
310     thr = Thread.new { tmp = IO.select([ @ep ]) }
311     thr.join 0.01
312     assert_nil tmp
313     @wr.write '.'
314     thr.join
315     assert_equal([[@ep],[],[]], tmp)
316   end
318   def test_new_no_cloexec
319     @ep.close
320     io = Epoll.new(0).to_io
321     assert((io.fcntl(Fcntl::F_GETFD) & Fcntl::FD_CLOEXEC) == 0)
322   end
324   def test_new_cloexec
325     @ep.close
326     io = Epoll.new(Epoll::CLOEXEC).to_io
327     assert((io.fcntl(Fcntl::F_GETFD) & Fcntl::FD_CLOEXEC) == Fcntl::FD_CLOEXEC)
328   end
330   def test_new
331     @ep.close
332     io = Epoll.new.to_io
333     assert_equal 0, io.fcntl(Fcntl::F_GETFD)
334   end
336   def test_delete
337     assert_nil @ep.delete(@rd)
338     assert_nil @ep.delete(@wr)
339     assert_nothing_raised { @ep.add @rd, Epoll::IN }
340     assert_equal @rd, @ep.delete(@rd)
341     assert_nil @ep.delete(@rd)
342   end
344   def test_io_for
345     @ep.add @rd, Epoll::IN
346     assert_equal @rd, @ep.io_for(@rd.fileno)
347     assert_equal @rd, @ep.io_for(@rd)
348     @ep.del @rd
349     assert_nil @ep.io_for(@rd.fileno)
350     assert_nil @ep.io_for(@rd)
351   end
353   def test_flags_for
354     @ep.add @rd, Epoll::IN
355     assert_equal Epoll::IN, @ep.flags_for(@rd.fileno)
356     assert_equal Epoll::IN, @ep.flags_for(@rd)
358     @ep.del @rd
359     assert_nil @ep.flags_for(@rd.fileno)
360     assert_nil @ep.flags_for(@rd)
361   end
363   def test_flags_for_sym
364     @ep.add @rd, :IN
365     assert_equal Epoll::IN, @ep.flags_for(@rd.fileno)
366     assert_equal Epoll::IN, @ep.flags_for(@rd)
368     @ep.del @rd
369     assert_nil @ep.flags_for(@rd.fileno)
370     assert_nil @ep.flags_for(@rd)
371   end
373   def test_flags_for_sym_ary
374     @ep.add @rd, [:IN, :ET]
375     expect = Epoll::IN | Epoll::ET
376     assert_equal expect, @ep.flags_for(@rd.fileno)
377     assert_equal expect, @ep.flags_for(@rd)
379     @ep.del @rd
380     assert_nil @ep.flags_for(@rd.fileno)
381     assert_nil @ep.flags_for(@rd)
382   end
384   def test_include?
385     assert ! @ep.include?(@rd)
386     @ep.add @rd, Epoll::IN
387     assert @ep.include?(@rd)
388     assert @ep.include?(@rd.fileno)
389     assert ! @ep.include?(@wr)
390     assert ! @ep.include?(@wr.fileno)
391   end
393   def test_cross_thread_close
394     tmp = []
395     thr = Thread.new { sleep(1); @ep.close }
396     assert_raises(IOError) do
397       @ep.wait { |flags, obj| tmp << [ flags, obj ] }
398     end
399     assert_nil thr.value
400   end if RUBY_VERSION == "1.9.3"
402   def test_epoll_wait_signal_torture
403     usr1 = 0
404     empty = 0
405     nr = 1000
406     @ep.add(@rd, Epoll::IN)
407     tmp = []
408     trap(:USR1) { usr1 += 1 }
409     pid = fork do
410       trap(:USR1, "DEFAULT")
411       sleep 0.1
412       ppid = Process.ppid
413       nr.times { Process.kill(:USR1, ppid); sleep 0.01 }
414       @wr.syswrite('.')
415       exit!(0)
416     end
417     while tmp.empty?
418       assert_nothing_raised { @ep.wait(nil, 100) { |flags,obj| tmp << obj } }
419       empty += 1
420     end
421     _, status = Process.waitpid2(pid)
422     assert status.success?, status.inspect
423     assert usr1 > 0, "usr1: #{usr1}"
424     ensure
425       trap(:USR1, "DEFAULT")
426   end