1 # -*- encoding: binary -*-
7 class TestMogileFS__MogileFS < TestMogileFS
10 @klass = MogileFS::MogileFS
15 t = TempServer.new(blk)
20 def read_headers(client)
22 while line = client.gets
24 return headers if line == "\r\n"
29 assert_equal 'test', @client.domain
31 assert_raises ArgumentError do
32 MogileFS::MogileFS.new :hosts => ['kaa:6001']
36 def test_get_file_data_http
37 tmp = Tempfile.new('accept')
38 accept = File.open(tmp.path, "ab")
39 svr = Proc.new do |serv, port|
40 client, _ = serv.accept
42 readed = read_headers(client)
44 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
47 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
52 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
53 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
55 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
57 assert_equal 'data!', @client.get_file_data('key')
58 assert_equal 1, accept.stat.size
61 def test_get_file_data_http_not_found_failover
62 tmp = Tempfile.new('accept')
63 accept = File.open(tmp.path, 'ab')
64 svr1 = Proc.new do |serv, port|
65 client, _ = serv.accept
67 readed = read_headers(client)
69 %r{\AGET /dev1/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
72 client.send("HTTP/1.0 404 Not Found\r\n\r\ndata!", 0)
76 svr2 = Proc.new do |serv, port|
77 client, _ = serv.accept
79 readed = read_headers(client)
81 %r{\AGET /dev2/0/000/000/0000000062\.fid HTTP/1.[01].*\r\n},
84 client.send("HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\ndata!", 0)
90 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
91 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
92 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
94 assert_equal 'data!', @client.get_file_data('key')
95 assert_equal 2, accept.stat.size
98 def test_get_file_data_http_block
99 tmpfp = Tempfile.new('test_mogilefs.open_data')
101 chunk_size = 1024 * 1024
102 expect_size = nr * chunk_size
103 header = "HTTP/1.0 200 OK\r\n" \
104 "Content-Length: #{expect_size}\r\n\r\n"
105 assert_equal header.size, tmpfp.syswrite(header)
106 nr.times { assert_equal chunk_size, tmpfp.syswrite(' ' * chunk_size) }
107 assert_equal expect_size + header.size, File.size(tmpfp.path)
110 accept = Tempfile.new('accept')
111 svr = Proc.new do |serv, port|
112 client, _ = serv.accept
115 readed = read_headers(client)
117 %r{\AGET /dev[12]/0/000/000/0000000062\.fid HTTP/1.[01]\r\n},
119 MogileFS.io.copy_stream(tmpfp, client)
125 path1 = "http://127.0.0.1:#{t1.port}/dev1/0/000/000/0000000062.fid"
126 path2 = "http://127.0.0.1:#{t2.port}/dev2/0/000/000/0000000062.fid"
128 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
130 data = Tempfile.new('test_mogilefs.dest_data')
132 @client.get_file_data('key') do |fp|
136 fp.sysread(16384, buf)
139 assert_equal read_nr, data.syswrite(buf), "partial write"
147 assert_equal expect_size, nr, "size mismatch"
148 assert_equal 1, accept.stat.size
152 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
153 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
155 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
157 expected = [ path1, path2 ]
159 assert_equal expected, @client.get_paths('key').sort
163 path1 = 'http://rur-1/dev1/0/000/000/0000000062.fid'
164 path2 = 'http://rur-2/dev2/0/000/000/0000000062.fid'
166 @backend.get_paths = { 'paths' => 2, 'path1' => path1, 'path2' => path2 }
168 expected = [ URI.parse(path1), URI.parse(path2) ]
170 assert_equal expected, @client.get_uris('key')
174 def test_get_paths_unknown_key
175 @backend.get_paths = ['unknown_key', '']
177 assert_raises MogileFS::Backend::UnknownKeyError do
178 assert_equal nil, @client.get_paths('key')
182 def test_delete_existing
183 @backend.delete = { }
184 @client.delete 'no_such_key'
187 def test_delete_nonexisting
188 @backend.delete = 'unknown_key', ''
189 assert_raises MogileFS::Backend::UnknownKeyError do
190 @client.delete('no_such_key')
194 def test_delete_readonly
195 @client.readonly = true
196 assert_raises MogileFS::ReadOnlyError do
197 @client.delete 'no_such_key'
202 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_2',
203 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
204 @backend.list_keys = { 'key_count' => 2, 'next_after' => 'new_key_4',
205 'key_1' => 'new_key_3', 'key_2' => 'new_key_4' }
206 @backend.list_keys = { 'key_count' => 0, 'next_after' => 'new_key_4' }
208 @client.each_key 'new' do |key|
212 assert_equal %w[new_key_1 new_key_2 new_key_3 new_key_4], keys
216 @backend.list_keys = { 'key_count' => '2', 'next_after' => 'new_key_2',
217 'key_1' => 'new_key_1', 'key_2' => 'new_key_2' }
219 keys, next_after = @client.list_keys 'new'
220 assert_equal ['new_key_1', 'new_key_2'], keys.sort
221 assert_equal 'new_key_2', next_after
224 def test_new_file_http
225 @client.readonly = true
226 assert_raises MogileFS::ReadOnlyError do
227 @client.new_file 'new_key', 'test'
231 def test_new_file_readonly
232 @client.readonly = true
233 assert_raises MogileFS::ReadOnlyError do
234 @client.new_file 'new_key', 'test'
238 def test_store_file_small_http
239 received = Tempfile.new('received')
240 to_store = Tempfile.new('small')
241 to_store.syswrite('data')
243 t = tmpsrv(Proc.new do |serv, accept|
244 client, _ = serv.accept
245 while buf = client.readpartial(666)
246 received.syswrite(buf)
247 break if buf =~ /data/
249 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
253 @backend.create_open = {
255 'path' => "http://127.0.0.1:#{t.port}/path",
257 nr = @client.store_file 'new_key', 'test', to_store.path
261 a = received.sysread(999999).split(/\r\n/)
262 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
263 assert_equal("data", a[-1])
264 assert_equal("", a[-2])
265 assert a.grep(%r{\AContent-Length: 4\z})[0]
268 def test_store_content_http
269 received = Tempfile.new('received')
271 t = tmpsrv(Proc.new do |serv, accept|
272 client, _ = serv.accept
275 while seen !~ /\r\n\r\ndata/
276 buf = client.readpartial(4096)
278 received.syswrite(buf)
280 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
284 @backend.create_open = {
286 'path' => "http://127.0.0.1:#{t.port}/path",
289 nr = @client.store_content 'new_key', 'test', 'data'
294 a = received.sysread(999999).split(/\r\n/)
295 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
296 assert_equal("data", a[-1])
297 assert_equal("", a[-2])
298 assert a.grep(%r{\AContent-Length: 4\z})[0]
302 def test_store_content_with_writer_callback
303 received = Tempfile.new('received')
304 expected = "PUT /path HTTP/1.0\r\nContent-Length: 40\r\n\r\n"
308 t = tmpsrv(Proc.new do |serv, accept|
309 client, _ = serv.accept
314 buf = client.readpartial(8192) or break
315 break if buf.length == 0
316 assert_equal buf.length, received.syswrite(buf)
319 break if seen =~ /\r\n\r\n(?:data){10}/
321 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
325 @backend.create_open = {
327 'path' => "http://127.0.0.1:#{t.port}/path",
330 cbk = MogileFS::Util::StoreContent.new(40) do |write_callback|
332 write_callback.call("data")
335 assert_equal 40, cbk.length
336 nr = @client.store_content('new_key', 'test', cbk)
340 a = received.sysread(999999).split(/\r\n/)
341 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
342 assert_equal("data" * 10, a[-1])
343 assert_equal("", a[-2])
344 assert a.grep(%r{\AContent-Length: 40\z})[0]
347 def test_store_content_multi_dest_failover_path
348 test_store_content_multi_dest_failover(true)
351 def test_store_content_multi_dest_failover(big_io = false)
352 received1 = Tempfile.new('received')
353 received2 = Tempfile.new('received')
355 t1 = tmpsrv(Proc.new do |serv, accept|
356 client, _ = serv.accept
358 while seen !~ /\r\n\r\ndata/
359 buf = client.readpartial(4096)
361 received1.syswrite(buf)
363 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
367 t2 = tmpsrv(Proc.new do |serv, accept|
368 client, _ = serv.accept
370 while seen !~ /\r\n\r\ndata/
371 buf = client.readpartial(4096)
373 received2.syswrite(buf)
375 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
379 @backend.create_open = {
382 'path_1' => "http://127.0.0.1:#{t1.port}/path",
384 'path_2' => "http://127.0.0.1:#{t2.port}/path",
388 tmp = Tempfile.new('data')
391 nr = @client.store_file('new_key', 'test', tmp.path)
394 nr = @client.store_content 'new_key', 'test', 'data'
399 a = received1.sysread(4096).split(/\r\n/)
400 b = received2.sysread(4096).split(/\r\n/)
401 assert_equal a[0], b[0]
402 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
403 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, b[0])
404 assert_equal("data", a[-1])
405 assert_equal("data", b[-1])
406 assert_equal("", a[-2])
407 assert_equal("", b[-2])
408 assert a.grep(%r{\AContent-Length: 4\z})[0]
409 assert b.grep(%r{\AContent-Length: 4\z})[0]
412 def test_store_content_http_fail
413 t = tmpsrv(Proc.new do |serv, accept|
414 client, _ = serv.accept
417 client.send("HTTP/1.0 500 Internal Server Error\r\n\r\n", 0)
421 @backend.create_open = {
423 'path' => "http://127.0.0.1:#{t.port}/path",
426 assert_raises MogileFS::HTTPFile::NoStorageNodesError do
427 @client.store_content 'new_key', 'test', 'data'
431 def test_store_content_http_empty
432 received = Tempfile.new('received')
433 t = tmpsrv(Proc.new do |serv, accept|
434 client, _ = serv.accept
436 received.syswrite(client.recv(4096, 0))
437 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
441 @backend.create_open = {
443 'path' => "http://127.0.0.1:#{t.port}/path",
446 nr = @client.store_content 'new_key', 'test', ''
449 a = received.sysread(4096).split(/\r\n/)
450 assert_match(%r{\APUT /path HTTP/1\.[01]\z}, a[0])
451 assert a.grep(%r{\AContent-Length: 0\z})[0]
454 def test_store_content_nfs
455 @backend.create_open = {
460 assert_raises MogileFS::UnsupportedPathError do
461 @client.store_content 'new_key', 'test', 'data'
465 def test_new_file_http_large
466 expect = Tempfile.new('test_mogilefs.expect')
467 to_put = Tempfile.new('test_mogilefs.to_put')
468 received = Tempfile.new('test_mogilefs.received')
471 chunk_size = 1024 * 1024
472 expect_size = nr * chunk_size
474 header = "PUT /path HTTP/1.0\r\n" \
475 "Content-Length: #{expect_size}\r\n\r\n"
476 assert_equal header.size, expect.syswrite(header)
478 assert_equal chunk_size, expect.syswrite(' ' * chunk_size)
479 assert_equal chunk_size, to_put.syswrite(' ' * chunk_size)
481 assert_equal expect_size + header.size, expect.stat.size
482 assert_equal expect_size, to_put.stat.size
484 readed = Tempfile.new('readed')
485 t = tmpsrv(Proc.new do |serv, accept|
486 client, _ = serv.accept
490 buf = client.readpartial(8192) or break
491 break if buf.length == 0
492 assert_equal buf.length, received.syswrite(buf)
494 break if nr >= expect.stat.size
496 readed.syswrite("#{nr}")
497 client.send("HTTP/1.0 200 OK\r\n\r\n", 0)
501 @backend.create_open = {
503 'path' => "http://127.0.0.1:#{t.port}/path",
506 orig_size = to_put.size
507 nr = @client.store_file('new_key', 'test', to_put.path)
508 assert nr, nr.inspect
509 assert_equal orig_size, nr
510 assert_equal orig_size, to_put.size
512 assert_equal expect.stat.size, readed.sysread(4096).to_i
514 ENV['PATH'].split(/:/).each do |path|
515 cmp_bin = "#{path}/cmp"
516 File.executable?(cmp_bin) or next
517 # puts "running #{cmp_bin} #{expect.path} #{received.path}"
518 assert( system(cmp_bin, expect.path, received.path) )
523 def test_store_content_readonly
524 @client.readonly = true
526 assert_raises MogileFS::ReadOnlyError do
527 @client.store_content 'new_key', 'test', nil
531 def test_store_file_readonly
532 @client.readonly = true
533 assert_raises MogileFS::ReadOnlyError do
534 @client.store_file 'new_key', 'test', nil
538 def test_rename_existing
541 assert_nil @client.rename('from_key', 'to_key')
544 def test_rename_nonexisting
545 @backend.rename = 'unknown_key', ''
547 assert_raises MogileFS::Backend::UnknownKeyError do
548 @client.rename('from_key', 'to_key')
552 def test_rename_no_key
553 @backend.rename = 'no_key', 'no_key'
555 e = assert_raises MogileFS::Backend::NoKeyError do
556 @client.rename 'new_key', 'test'
559 assert_equal 'no_key', e.message
562 def test_rename_readonly
563 @client.readonly = true
565 e = assert_raises MogileFS::ReadOnlyError do
566 @client.rename 'new_key', 'test'
569 assert_equal 'readonly mogilefs', e.message
572 def assert_get_paths_args(expect, *args)
573 sock = TCPServer.new("127.0.0.1", 0)
574 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
575 c = MogileFS::MogileFS.new(nargs)
581 a.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
584 paths_expect = %w(http://0/a http://0/b)
585 assert_equal paths_expect, c.get_paths("f", *args)
587 assert_equal 1, received.size
588 tmp = c.backend.url_decode(received[0].split(/\s+/)[1])
589 assert_equal "f", tmp.delete("key")
590 assert_equal "foo", tmp.delete("domain")
591 assert_equal expect, tmp
597 def test_get_paths_args
598 assert_get_paths_args({"noverify"=>"1", "zone"=>""})
599 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, false)
600 assert_get_paths_args({"noverify"=>"0", "zone"=>""}, :noverify=>false)
601 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"}, true, "alt")
602 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt"},
603 {:noverify => true, :zone => "alt"})
604 assert_get_paths_args({"noverify"=>"1", "zone"=>"alt","pathcount"=>"666"},
605 {:noverify => true, :zone => "alt", :pathcount=>666})
608 def test_idempotent_command_eof
610 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
611 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
612 args = { :hosts => hosts, :domain => "foo" }
613 c = MogileFS::MogileFS.new(args)
616 r = IO.select([a, b])
621 r = IO.select([a, b])
624 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
627 expect = %w(http://0/a http://0/b)
628 assert_equal expect, c.get_paths("f")
630 assert_equal 2, received.size
631 assert_equal received[0], received[1]
634 def test_idempotent_command_slow
636 a = TCPServer.new(ip, 0)
637 hosts = [ "#{ip}:#{a.addr[1]}" ]
640 args = { :hosts => hosts, :domain => "foo", :timeout => timeout }
641 c = MogileFS::MogileFS.new(args)
648 %r{key=(\w+)} =~ line
650 sleep(secs) # cause the client to timeout:
653 x.write("OK paths=1&path1=http://0/#{$1}\r\n")
655 # EPIPE may or may not get raised due to timing issue,
656 # we don't care either way
658 flunk("#{e.message} (#{e.class})")
662 # client should start a new connection here
666 %r{key=(\w+)} =~ line
668 y.write("OK paths=1&path1=http://0/#{$1}\r\n")
670 flunk("#{e.message} (#{e.class})")
673 # the client should've killed the old connection:
674 assert_raises(Errno::EPIPE) do
675 loop { x.write("OK paths=1&path1=http://0/#{$1}\r\n") }
678 close_later # main thread closes
680 assert_raises(MogileFS::UnreadableSocketError) do
683 assert_equal :continue_test, q.pop, "avoid race during test"
684 expect2 = %w(http://0/b)
685 assert_equal expect2, c.get_paths("b")
687 close_later = th.value
688 close_later.each { |io| assert_nil io.close }
691 def test_idempotent_command_response_truncated
693 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
694 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
695 args = { :hosts => hosts, :domain => "foo" }
696 c = MogileFS::MogileFS.new(args)
699 r = IO.select([a, b])
702 x.write("OK paths=2&path1=http://0/a&path2=http://0/")
705 r = IO.select([a, b])
708 x.write("OK paths=2&path1=http://0/a&path2=http://0/b\r\n")
711 expect = %w(http://0/a http://0/b)
712 assert_equal expect, c.get_paths("f")
714 assert_equal 2, received.size
715 assert_equal received[0], received[1]
718 def test_non_idempotent_command_eof
720 a, b = TCPServer.new(ip, 0), TCPServer.new(ip, 0)
721 hosts = [ "#{ip}:#{a.addr[1]}", "#{ip}:#{b.addr[1]}" ]
722 args = { :hosts => hosts, :domain => "foo" }
723 c = MogileFS::MogileFS.new(args)
726 r = IO.select([a, b])
731 assert_raises(EOFError) { c.rename("a", "b") }
733 assert_equal 1, received.size
736 def test_list_keys_verbose_ordering # implementation detail
738 sock = TCPServer.new("127.0.0.1", 0)
739 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
740 c = MogileFS::MogileFS.new(nargs)
743 %w(a b c d e).each do |key|
745 cmd, args = line.split(/\s+/, 2)
746 args = c.backend.url_decode(args.strip)
747 assert_equal "file_info", cmd
748 assert_equal key, args["key"]
750 out = { "length" => 3, "devcount" => 6 }
751 %w(a b c d e).shuffle.each do |key|
753 a.write "OK #{c.backend.url_encode(out)}\r\n"
758 blk = lambda do |key, length, devcount|
759 received << [ key, length, devcount ]
761 c.list_keys_verbose(%w(a b c d e), blk)
763 received.map! { |(key,_,_)| key }
764 assert_equal %w(a b c d e), received
769 def test_list_keys_verbose_retry_eof # implementation detail
771 sock = TCPServer.new("127.0.0.1", 0)
772 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
773 c = MogileFS::MogileFS.new(nargs)
776 %w(a b c d e).each do |key|
778 cmd, args = line.split(/\s+/, 2)
779 args = c.backend.url_decode(args.strip)
780 assert_equal "file_info", cmd
781 assert_equal key, args["key"]
783 out = { "length" => 3, "devcount" => 6 }
784 %w(d e).each do |key|
786 a.write "OK #{c.backend.url_encode(out)}\r\n"
788 a.close # trigger EOF
789 a = sock.accept # client will retry
790 %w(a b c).each do |key|
792 cmd, args = line.split(/\s+/, 2)
793 args = c.backend.url_decode(args.strip)
794 assert_equal "file_info", cmd
795 assert_equal key, args["key"]
797 a.write "OK #{c.backend.url_encode(out)}\r\n"
802 blk = lambda do |key, length, devcount|
803 received << [ key, length, devcount ]
805 c.list_keys_verbose(%w(a b c d e), blk)
807 received.map! { |(key,_,_)| key }
808 assert_equal %w(a b c d e), received
813 def test_list_keys_verbose_retry_truncated # implementation detail
815 sock = TCPServer.new("127.0.0.1", 0)
816 nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
817 c = MogileFS::MogileFS.new(nargs)
820 %w(a b c d e).each do |key|
822 cmd, args = line.split(/\s+/, 2)
823 args = c.backend.url_decode(args.strip)
824 assert_equal "file_info", cmd
825 assert_equal key, args["key"]
827 out = { "length" => 3, "devcount" => 6 }
829 a.write "OK #{c.backend.url_encode(out)}\r\n"
831 a.write "OK #{c.backend.url_encode(out)}"
832 a.close # trigger EOF
834 a = sock.accept # client will retry
835 %w(b c d e).each do |key|
837 cmd, args = line.split(/\s+/, 2)
838 args = c.backend.url_decode(args.strip)
839 assert_equal "file_info", cmd
840 assert_equal key, args["key"]
842 a.write "OK #{c.backend.url_encode(out)}\r\n"
847 blk = lambda do |key, length, devcount|
848 received << [ key, length, devcount ]
850 c.list_keys_verbose(%w(a b c d e), blk)
852 received.map! { |(key,_,_)| key }
853 assert_equal %w(a b c d e), received
860 assert_equal({}, @client.sleep(2))
864 @tmpsrv.each { |t| t.destroy! }
869 # tested with 1000, though it takes a while
871 ENV['NR_CHUNKS'] ? ENV['NR_CHUNKS'].to_i : 10