1 #!/usr/bin/env python2.4
2 from basetest
import BaseTest
3 import sys
, tempfile
, os
4 from StringIO
import StringIO
5 import unittest
, signal
6 from logging
import getLogger
, WARN
, ERROR
8 sys
.path
.insert(0, '..')
10 # If http_proxy is set it can cause the download tests to fail.
11 os
.environ
["http_proxy"] = ""
13 from zeroinstall
.injector
import model
, autopolicy
, gpg
, iface_cache
, download
, reader
, trust
, handler
, background
, arch
, selections
, qdom
14 from zeroinstall
.zerostore
import Store
; Store
._add
_with
_helper
= lambda *unused
: False
15 from zeroinstall
.support
import basedir
, tasks
25 background
._detach
= lambda: False
26 background
._exec
_gui
= raise_gui
27 sys
.modules
['dbus'] = my_dbus
28 sys
.modules
['dbus.glib'] = my_dbus
29 my_dbus
.types
= my_dbus
30 sys
.modules
['dbus.types'] = my_dbus
33 def __init__(self
, reply
):
39 class DummyHandler(handler
.Handler
):
40 __slots__
= ['ex', 'tb']
43 handler
.Handler
.__init
__(self
)
46 def wait_for_blocker(self
, blocker
):
48 handler
.Handler
.wait_for_blocker(self
, blocker
)
50 raise self
.ex
, None, self
.tb
52 def report_error(self
, ex
, tb
= None):
53 assert self
.ex
is None, self
.ex
58 #traceback.print_exc()
60 class TestDownload(BaseTest
):
64 stream
= tempfile
.TemporaryFile()
65 stream
.write(data
.thomas_key
)
67 gpg
.import_key(stream
)
70 trust
.trust_db
.watchers
= []
73 BaseTest
.tearDown(self
)
74 if self
.child
is not None:
75 os
.kill(self
.child
, signal
.SIGTERM
)
76 os
.waitpid(self
.child
, 0)
79 def testRejectKey(self
):
82 sys
.stdout
= StringIO()
83 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
84 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
85 handler
= DummyHandler())
86 assert policy
.need_download()
87 sys
.stdin
= Reply("N\n")
89 policy
.download_and_execute(['Hello'])
91 except model
.SafeException
, ex
:
92 if "Not signed with a trusted key" not in str(ex
):
97 def testRejectKeyXML(self
):
100 sys
.stdout
= StringIO()
101 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
102 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml', download_only
= False,
103 handler
= DummyHandler())
104 assert policy
.need_download()
105 sys
.stdin
= Reply("N\n")
107 policy
.download_and_execute(['Hello'])
109 except model
.SafeException
, ex
:
110 if "Not signed with a trusted key" not in str(ex
):
115 def testImport(self
):
118 from zeroinstall
.injector
import cli
120 rootLogger
= getLogger()
121 rootLogger
.disabled
= True
124 cli
.main(['--import', '-v', 'NO-SUCH-FILE'])
126 except model
.SafeException
, ex
:
127 assert 'NO-SUCH-FILE' in str(ex
)
129 rootLogger
.disabled
= False
130 rootLogger
.setLevel(WARN
)
132 hello
= iface_cache
.iface_cache
.get_interface('http://localhost:8000/Hello')
133 self
.assertEquals(0, len(hello
.implementations
))
135 sys
.stdout
= StringIO()
136 self
.child
= server
.handle_requests('6FCF121BE2390E0B.gpg')
137 sys
.stdin
= Reply("Y\n")
139 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
140 cli
.main(['--import', 'Hello'])
141 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
143 # Check we imported the interface after trusting the key
144 reader
.update_from_cache(hello
)
145 self
.assertEquals(1, len(hello
.implementations
))
147 # Shouldn't need to prompt the second time
149 cli
.main(['--import', 'Hello'])
153 def testSelections(self
):
154 from zeroinstall
.injector
.cli
import _download_missing_selections
155 root
= qdom
.parse(file("selections.xml"))
156 sels
= selections
.Selections(root
)
157 class Options
: dry_run
= False
161 sys
.stdout
= StringIO()
162 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
163 sys
.stdin
= Reply("Y\n")
164 _download_missing_selections(Options(), sels
)
165 path
= iface_cache
.iface_cache
.stores
.lookup(sels
.selections
['http://localhost:8000/Hello.xml'].id)
166 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
168 assert sels
.download_missing(iface_cache
.iface_cache
, None) is None
172 def testSelectionsWithFeed(self
):
173 from zeroinstall
.injector
.cli
import _download_missing_selections
174 root
= qdom
.parse(file("selections.xml"))
175 sels
= selections
.Selections(root
)
176 class Options
: dry_run
= False
180 sys
.stdout
= StringIO()
181 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
182 sys
.stdin
= Reply("Y\n")
184 from zeroinstall
.injector
import fetch
185 from zeroinstall
.injector
.handler
import Handler
187 fetcher
= fetch
.Fetcher(handler
)
188 handler
.wait_for_blocker(fetcher
.download_and_import_feed('http://localhost:8000/Hello.xml', iface_cache
.iface_cache
))
190 _download_missing_selections(Options(), sels
)
191 path
= iface_cache
.iface_cache
.stores
.lookup(sels
.selections
['http://localhost:8000/Hello.xml'].id)
192 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
194 assert sels
.download_missing(iface_cache
.iface_cache
, None) is None
198 def testAcceptKey(self
):
201 sys
.stdout
= StringIO()
202 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
203 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
204 handler
= DummyHandler())
205 assert policy
.need_download()
206 sys
.stdin
= Reply("Y\n")
208 policy
.download_and_execute(['Hello'], main
= 'Missing')
210 except model
.SafeException
, ex
:
211 if "HelloWorld/Missing" not in str(ex
):
216 def testRecipe(self
):
219 sys
.stdout
= StringIO()
220 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
221 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False)
223 policy
.download_and_execute([])
225 except model
.SafeException
, ex
:
226 if "HelloWorld/Missing" not in str(ex
):
231 def testSymlink(self
):
234 sys
.stdout
= StringIO()
235 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
236 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('RecipeSymlink.xml'), download_only
= False,
237 handler
= DummyHandler())
239 policy
.download_and_execute([])
241 except model
.SafeException
, ex
:
242 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
244 self
.assertEquals(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
248 def testAutopackage(self
):
251 sys
.stdout
= StringIO()
252 self
.child
= server
.handle_requests('HelloWorld.autopackage')
253 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Autopackage.xml'), download_only
= False)
255 policy
.download_and_execute([])
257 except model
.SafeException
, ex
:
258 if "HelloWorld/Missing" not in str(ex
):
263 def testRecipeFailure(self
):
266 sys
.stdout
= StringIO()
267 self
.child
= server
.handle_requests('*')
268 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False,
269 handler
= DummyHandler())
271 policy
.download_and_execute([])
273 except download
.DownloadError
, ex
:
274 if "Connection" not in str(ex
):
279 def testMirrors(self
):
282 sys
.stdout
= StringIO()
283 getLogger().setLevel(ERROR
)
284 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
285 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
286 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml', download_only
= False)
287 policy
.fetcher
.feed_mirror
= 'http://localhost:8000/0mirror'
289 refreshed
= policy
.solve_with_downloads()
290 policy
.handler
.wait_for_blocker(refreshed
)
295 def testReplay(self
):
298 sys
.stdout
= StringIO()
299 getLogger().setLevel(ERROR
)
300 iface
= iface_cache
.iface_cache
.get_interface('http://localhost:8000/Hello.xml')
301 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
302 iface_cache
.iface_cache
.update_interface_from_network(iface
, file('Hello-new.xml').read(), mtime
+ 10000)
304 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
305 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
306 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml', download_only
= False)
307 policy
.fetcher
.feed_mirror
= 'http://localhost:8000/0mirror'
309 # Update from mirror (should ignore out-of-date timestamp)
310 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, iface_cache
.iface_cache
)
311 policy
.handler
.wait_for_blocker(refreshed
)
313 # Update from upstream (should report an error)
314 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, iface_cache
.iface_cache
)
316 policy
.handler
.wait_for_blocker(refreshed
)
317 raise Exception("Should have been rejected!")
318 except model
.SafeException
, ex
:
319 assert "New interface's modification time is before old version" in str(ex
)
321 # Must finish with the newest version
322 self
.assertEquals(1209206132, iface_cache
.iface_cache
._get
_signature
_date
(iface
.uri
))
326 def testBackground(self
, verbose
= False):
327 p
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml')
328 reader
.update(iface_cache
.iface_cache
.get_interface(p
.root
), 'Hello.xml')
330 p
.network_use
= model
.network_minimal
331 p
.solver
.solve(p
.root
, arch
.get_host_architecture())
335 def choose_download(registed_cb
, nid
, actions
):
337 assert actions
== ['download', 'Download'], actions
338 registed_cb(nid
, 'download')
341 traceback
.print_exc()
348 sys
.stdout
= StringIO()
349 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
350 my_dbus
.user_callback
= choose_download
354 # The background handler runs in the same process
355 # as the tests, so don't let it abort.
356 if os
.getpid() == pid
:
357 raise SystemExit(code
)
358 # But, child download processes are OK
363 background
.spawn_background_update(p
, verbose
)
365 except SystemExit, ex
:
366 self
.assertEquals(1, ex
.code
)
373 def testBackgroundVerbose(self
):
374 self
.testBackground(verbose
= True)
376 suite
= unittest
.makeSuite(TestDownload
)
377 if __name__
== '__main__':
378 sys
.argv
.append('-v')