1 # Copyright (C) 2010 Oregon State University et al.
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
18 from django
.test
import TestCase
19 from django
.test
.client
import Client
21 from django_test_tools
.views
import ViewTestMixin
22 from django_test_tools
.users
import UserTestMixin
24 from ganeti_web
.util
.proxy
import RapiProxy
, CallProxy
25 from ganeti_web
.util
.proxy
.constants
import JOB
, JOB_RUNNING
, JOB_ERROR
26 from ganeti_web
import models
27 from ganeti_web
.tests
.views
.virtual_machine
.base
import VirtualMachineTestCaseMixin
30 Cluster
= models
.Cluster
31 VirtualMachine
= models
.VirtualMachine
35 class TestJobMixin(VirtualMachineTestCaseMixin
):
38 models
.client
.GanetiRapiClient
= RapiProxy
40 self
.vm
, self
.cluster
= self
.create_virtual_machine()
46 def test_trivial(self
):
48 Test the setUp() and tearDown() methods.
54 class TestJobModel(TestJobMixin
, TestCase
):
56 def test_instantiation(self
):
58 Test instantiating a Job
62 def test_non_trivial(self
):
64 Test instantiating a Job with extra parameters
66 return Job(job_id
=1, cluster
=self
.cluster
, obj
=self
.vm
)
75 * Hash is copied from cluster
77 job
= Job(job_id
=1, cluster
=self
.cluster
, obj
=self
.vm
)
80 job
= Job
.objects
.get(id=job
.id)
81 self
.assertFalse(None, job
.info
)
82 self
.assertFalse(job
.error
)
85 def test_hash_update(self
):
87 When cluster is saved hash for its Jobs should be updated
89 job1
= self
.test_save()
90 job2
= self
.test_save()
92 self
.assertEqual(self
.cluster
.hash, job1
.cluster_hash
)
93 self
.assertEqual(self
.cluster
.hash, job2
.cluster_hash
)
95 # change cluster's hash
96 self
.cluster
.hostname
= 'SomethingDifferent'
98 job1
= Job
.objects
.get(pk
=job1
.id)
99 job2
= Job
.objects
.get(pk
=job2
.id)
100 self
.assertEqual(job1
.cluster_hash
, self
.cluster
.hash,
101 'Job does not have updated cache')
102 self
.assertEqual(job2
.cluster_hash
, self
.cluster
.hash,
103 'Job does not have updated cache')
105 def test_cache_reset(self
):
107 Tests that cache reset is working properly.
110 * when success or error status is achieved the job no longer updates
112 job
= self
.test_save()
113 job
.ignore_cache
= True
116 rapi
.GetJobStatus
.response
= JOB_RUNNING
117 CallProxy
.patch(job
, '_refresh')
119 # load with running status, should refresh
121 self
.assertTrue(job
.ignore_cache
)
122 job
._refresh
.assertCalled(self
)
125 # load again with running status, should refresh
127 self
.assertTrue(job
.ignore_cache
)
128 job
._refresh
.assertCalled(self
)
131 # load again with success status, should refresh and flip cache flag
132 rapi
.GetJobStatus
.response
= JOB
134 self
.assertFalse(job
.ignore_cache
)
135 job
._refresh
.assertCalled(self
)
138 # load again with success status, should use cache
140 self
.assertFalse(job
.ignore_cache
)
141 job
._refresh
.assertNotCalled(self
)
143 def test_cache_reset_error(self
):
145 Tests that cache reset is working properly.
148 * when success or error status is achieved the job no longer updates
150 job
= self
.test_save()
151 job
.ignore_cache
= True
154 rapi
.GetJobStatus
.response
= JOB_RUNNING
155 CallProxy
.patch(job
, '_refresh')
157 # load with running status, should refresh
159 self
.assertTrue(job
.ignore_cache
)
160 job
._refresh
.assertCalled(self
)
163 # load again with running status, should refresh
165 self
.assertTrue(job
.ignore_cache
)
166 job
._refresh
.assertCalled(self
)
169 # load again with success status, should refresh and flip cache flag
170 rapi
.GetJobStatus
.response
= JOB_ERROR
172 self
.assertFalse(job
.ignore_cache
)
173 job
._refresh
.assertCalled(self
)
176 # load again with success status, should use cache
178 self
.assertFalse(job
.ignore_cache
)
179 job
._refresh
.assertNotCalled(self
)
182 class TestJobViews(TestJobMixin
, TestCase
, UserTestMixin
, ViewTestMixin
):
185 super(TestJobViews
, self
).setUp()
187 self
.create_standard_users()
188 self
.create_users(['user', 'vm_owner', 'cluster_admin', 'vm_admin'])
191 self
.cluster_admin
.grant('admin', self
.cluster
)
192 self
.vm_admin
.grant('admin', self
.vm
)
193 self
.vm
.owner
= self
.vm_owner
.get_profile()
200 self
.unauthorized
.delete()
201 self
.superuser
.delete()
203 self
.vm_owner
.delete()
204 self
.cluster_admin
.delete()
205 self
.vm_admin
.delete()
207 super(TestJobViews
, self
).tearDown()
209 def test_clear_job(self
):
211 url
= '/cluster/%s/job/%s/clear/'
213 c_error
= Job
.objects
.create(cluster
=self
.cluster
, obj
=self
.cluster
,
215 c_error
.info
= JOB_ERROR
217 c_error
= Job
.objects
.get(pk
=c_error
.pk
)
218 self
.cluster
.last_job
= c_error
219 self
.cluster
.ignore_cache
= True
221 vm_error
= Job
.objects
.create(cluster
=self
.cluster
, obj
=self
.vm
,
223 vm_error
.info
= JOB_ERROR
225 vm_error
= Job
.objects
.get(pk
=vm_error
.pk
)
226 self
.vm
.last_job
= vm_error
227 self
.vm
.ignore_cache
= True
231 args
= (self
.cluster
.slug
, c_error
.job_id
)
232 self
.assert_standard_fails(url
, args
, method
='post')
234 # not authorized for cluster
235 self
.assert_403(url
, args
, users
=[self
.vm_admin
, self
.vm_owner
],
236 data
={'id':c_error
.pk
}, method
='post')
238 # does not clear job if it is not the the current job
239 vm_error
= Job
.objects
.create(cluster
=self
.cluster
, obj
=self
.vm
,
241 vm_error
.info
= JOB_ERROR
243 vm_error
= Job
.objects
.get(pk
=vm_error
.pk
)
244 vm_error2
= Job
.objects
.create(cluster
=self
.cluster
, obj
=self
.vm
,
246 vm_error2
.info
= JOB_ERROR
248 vm_error2
= Job
.objects
.get(pk
=vm_error
.pk
)
249 self
.vm
.last_job
= vm_error
250 self
.vm
.ignore_cache
= True
253 self
.c
.post(url
% (self
.cluster
.slug
, vm_error2
.job_id
))
254 updated
= VirtualMachine
.objects
.filter(pk
=self
.vm
.pk
).values('last_job_id','ignore_cache')[0]
255 self
.assertEqual(vm_error
.pk
, updated
['last_job_id'])
256 self
.assertTrue(updated
['ignore_cache'])
258 def test_clear_job_superuser(self
):
260 url
= '/cluster/%s/job/%s/clear/'
262 c_error
= Job
.objects
.create(cluster
=self
.cluster
, obj
=self
.cluster
,
264 c_error
.info
= JOB_ERROR
266 c_error
= Job
.objects
.get(pk
=c_error
.pk
)
267 self
.cluster
.last_job
= c_error
268 self
.cluster
.ignore_cache
= True
271 args
= (self
.cluster
.slug
, c_error
.job_id
)
273 # authorized for cluster
274 def tests(user
, response
):
275 qs
= Job
.objects
.filter(pk
=c_error
.pk
)
277 updated
= Cluster
.objects
.filter(pk
=self
.cluster
.pk
).values('last_job_id','ignore_cache')[0]
278 self
.assertEqual(None, updated
['last_job_id'])
279 self
.assertFalse(updated
['ignore_cache'])
280 self
.assert_200(url
, args
, users
=[self
.superuser
],
281 data
={'id':c_error
.pk
}, tests
=tests
, method
='post',
282 mime
='application/json')
284 def test_clear_job_authorized_cluster(self
):
286 url
= '/cluster/%s/job/%s/clear/'
288 c_error
= Job
.objects
.create(cluster
=self
.cluster
, obj
=self
.cluster
,
290 c_error
.info
= JOB_ERROR
292 c_error
= Job
.objects
.get(pk
=c_error
.pk
)
293 self
.cluster
.last_job
= c_error
294 self
.cluster
.ignore_cache
= True
297 args
= (self
.cluster
.slug
, c_error
.job_id
)
299 # authorized for cluster
300 def tests(user
, response
):
301 qs
= Job
.objects
.filter(pk
=c_error
.pk
)
303 updated
= Cluster
.objects
.filter(pk
=self
.cluster
.pk
).values('last_job_id','ignore_cache')[0]
304 self
.assertEqual(None, updated
['last_job_id'])
305 self
.assertFalse(updated
['ignore_cache'])
306 self
.assert_200(url
, args
, users
=[self
.cluster_admin
],
307 data
={'id':c_error
.pk
}, tests
=tests
, method
='post',
308 mime
='application/json')
310 def test_clear_job_authorized_vm(self
):
312 url
= '/cluster/%s/job/%s/clear/'
314 # XXX ugh, sorry for this!
315 for user
in [self
.superuser
, self
.cluster_admin
, self
.vm_admin
,
318 vm_error
= Job
.objects
.create(cluster
=self
.cluster
, obj
=self
.vm
,
320 vm_error
.info
= JOB_ERROR
322 vm_error
= Job
.objects
.get(pk
=vm_error
.pk
)
323 self
.vm
.last_job
= vm_error
324 self
.vm
.ignore_cache
= True
327 args
= (self
.cluster
.slug
, vm_error
.job_id
)
329 def tests(user
, response
):
330 qs
= Job
.objects
.filter(pk
=vm_error
.pk
)
331 self
.assertFalse(qs
.exists(), "job error was not deleted")
332 updated
= VirtualMachine
.objects
.filter(pk
=self
.vm
.pk
).values('last_job_id','ignore_cache')[0]
333 self
.assertEqual(None, updated
['last_job_id'])
334 self
.assertFalse(updated
['ignore_cache'])
335 self
.assert_200(url
, args
, users
=[user
], data
={'id':vm_error
.id},
336 tests
=tests
, method
='post',
337 mime
='application/json')
339 def test_job_detail(self
):
341 tests viewing job detail
344 c_error
= Job
.objects
.create(cluster
=self
.cluster
, obj
=self
.cluster
,
346 c_error
.info
= JOB_ERROR
349 url
= '/cluster/%s/job/%s/detail/'
350 args
= (self
.cluster
.slug
, c_error
.job_id
)
352 self
.assert_standard_fails(url
, args
, authorized
=False)
353 self
.assert_200(url
, args
, users
=[self
.superuser
, self
.cluster_admin
],
354 template
='ganeti/job/detail.html')