models: Remove "cleared" and "processed" fields from Jobs.
[ganeti_webmgr.git] / ganeti_web / tests / job.py
blob56701ddf11c41ab1c83522c28931ac021a3a2b15
1 # Copyright (C) 2010 Oregon State University et al.
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
16 # USA.
18 from django.test import TestCase
19 from django.test.client import Client
21 from django_test_tools.views import ViewTestMixin
22 from django_test_tools.users import UserTestMixin
24 from ganeti_web.util.proxy import RapiProxy, CallProxy
25 from ganeti_web.util.proxy.constants import JOB, JOB_RUNNING, JOB_ERROR
26 from ganeti_web import models
27 from ganeti_web.tests.views.virtual_machine.base import VirtualMachineTestCaseMixin
30 Cluster = models.Cluster
31 VirtualMachine = models.VirtualMachine
32 Job = models.Job
35 class TestJobMixin(VirtualMachineTestCaseMixin):
37 def setUp(self):
38 models.client.GanetiRapiClient = RapiProxy
40 self.vm, self.cluster = self.create_virtual_machine()
42 def tearDown(self):
43 self.vm.delete()
44 self.cluster.delete()
46 def test_trivial(self):
47 """
48 Test the setUp() and tearDown() methods.
49 """
51 pass
54 class TestJobModel(TestJobMixin, TestCase):
56 def test_instantiation(self):
57 """
58 Test instantiating a Job
59 """
60 Job()
62 def test_non_trivial(self):
63 """
64 Test instantiating a Job with extra parameters
65 """
66 return Job(job_id=1, cluster=self.cluster, obj=self.vm)
68 def test_save(self):
69 """
70 Test saving a Job
72 Verify:
73 * Job can be saved
74 * Job can be loaded
75 * Hash is copied from cluster
76 """
77 job = Job(job_id=1, cluster=self.cluster, obj=self.vm)
78 job.save()
80 job = Job.objects.get(id=job.id)
81 self.assertFalse(None, job.info)
82 self.assertFalse(job.error)
83 return job
85 def test_hash_update(self):
86 """
87 When cluster is saved hash for its Jobs should be updated
88 """
89 job1 = self.test_save()
90 job2 = self.test_save()
92 self.assertEqual(self.cluster.hash, job1.cluster_hash)
93 self.assertEqual(self.cluster.hash, job2.cluster_hash)
95 # change cluster's hash
96 self.cluster.hostname = 'SomethingDifferent'
97 self.cluster.save()
98 job1 = Job.objects.get(pk=job1.id)
99 job2 = Job.objects.get(pk=job2.id)
100 self.assertEqual(job1.cluster_hash, self.cluster.hash,
101 'Job does not have updated cache')
102 self.assertEqual(job2.cluster_hash, self.cluster.hash,
103 'Job does not have updated cache')
105 def test_cache_reset(self):
107 Tests that cache reset is working properly.
109 Verifies:
110 * when success or error status is achieved the job no longer updates
112 job = self.test_save()
113 job.ignore_cache = True
114 job.save()
115 rapi = job.rapi
116 rapi.GetJobStatus.response = JOB_RUNNING
117 CallProxy.patch(job, '_refresh')
119 # load with running status, should refresh
120 job.load_info()
121 self.assertTrue(job.ignore_cache)
122 job._refresh.assertCalled(self)
123 job._refresh.reset()
125 # load again with running status, should refresh
126 job.load_info()
127 self.assertTrue(job.ignore_cache)
128 job._refresh.assertCalled(self)
129 job._refresh.reset()
131 # load again with success status, should refresh and flip cache flag
132 rapi.GetJobStatus.response = JOB
133 job.load_info()
134 self.assertFalse(job.ignore_cache)
135 job._refresh.assertCalled(self)
136 job._refresh.reset()
138 # load again with success status, should use cache
139 job.load_info()
140 self.assertFalse(job.ignore_cache)
141 job._refresh.assertNotCalled(self)
143 def test_cache_reset_error(self):
145 Tests that cache reset is working properly.
147 Verifies:
148 * when success or error status is achieved the job no longer updates
150 job = self.test_save()
151 job.ignore_cache = True
152 job.save()
153 rapi = job.rapi
154 rapi.GetJobStatus.response = JOB_RUNNING
155 CallProxy.patch(job, '_refresh')
157 # load with running status, should refresh
158 job.load_info()
159 self.assertTrue(job.ignore_cache)
160 job._refresh.assertCalled(self)
161 job._refresh.reset()
163 # load again with running status, should refresh
164 job.load_info()
165 self.assertTrue(job.ignore_cache)
166 job._refresh.assertCalled(self)
167 job._refresh.reset()
169 # load again with success status, should refresh and flip cache flag
170 rapi.GetJobStatus.response = JOB_ERROR
171 job.load_info()
172 self.assertFalse(job.ignore_cache)
173 job._refresh.assertCalled(self)
174 job._refresh.reset()
176 # load again with success status, should use cache
177 job.load_info()
178 self.assertFalse(job.ignore_cache)
179 job._refresh.assertNotCalled(self)
182 class TestJobViews(TestJobMixin, TestCase, UserTestMixin, ViewTestMixin):
184 def setUp(self):
185 super(TestJobViews, self).setUp()
187 self.create_standard_users()
188 self.create_users(['user', 'vm_owner', 'cluster_admin', 'vm_admin'])
190 # additional perms
191 self.cluster_admin.grant('admin', self.cluster)
192 self.vm_admin.grant('admin', self.vm)
193 self.vm.owner = self.vm_owner.get_profile()
194 self.vm.save()
196 self.c = Client()
198 def tearDown(self):
199 # Tear down users.
200 self.unauthorized.delete()
201 self.superuser.delete()
202 self.user.delete()
203 self.vm_owner.delete()
204 self.cluster_admin.delete()
205 self.vm_admin.delete()
207 super(TestJobViews, self).tearDown()
209 def test_clear_job(self):
211 url = '/cluster/%s/job/%s/clear/'
213 c_error = Job.objects.create(cluster=self.cluster, obj=self.cluster,
214 job_id=1)
215 c_error.info = JOB_ERROR
216 c_error.save()
217 c_error = Job.objects.get(pk=c_error.pk)
218 self.cluster.last_job = c_error
219 self.cluster.ignore_cache = True
220 self.cluster.save()
221 vm_error = Job.objects.create(cluster=self.cluster, obj=self.vm,
222 job_id=2)
223 vm_error.info = JOB_ERROR
224 vm_error.save()
225 vm_error = Job.objects.get(pk=vm_error.pk)
226 self.vm.last_job = vm_error
227 self.vm.ignore_cache = True
228 self.vm.save()
230 # standard errors
231 args = (self.cluster.slug, c_error.job_id)
232 self.assert_standard_fails(url, args, method='post')
234 # not authorized for cluster
235 self.assert_403(url, args, users=[self.vm_admin, self.vm_owner],
236 data={'id':c_error.pk}, method='post')
238 # does not clear job if it is not the the current job
239 vm_error = Job.objects.create(cluster=self.cluster, obj=self.vm,
240 job_id=3)
241 vm_error.info = JOB_ERROR
242 vm_error.save()
243 vm_error = Job.objects.get(pk=vm_error.pk)
244 vm_error2 = Job.objects.create(cluster=self.cluster, obj=self.vm,
245 job_id=4)
246 vm_error2.info = JOB_ERROR
247 vm_error2.save()
248 vm_error2 = Job.objects.get(pk=vm_error.pk)
249 self.vm.last_job = vm_error
250 self.vm.ignore_cache = True
251 self.vm.save()
253 self.c.post(url % (self.cluster.slug, vm_error2.job_id))
254 updated = VirtualMachine.objects.filter(pk=self.vm.pk).values('last_job_id','ignore_cache')[0]
255 self.assertEqual(vm_error.pk, updated['last_job_id'])
256 self.assertTrue(updated['ignore_cache'])
258 def test_clear_job_superuser(self):
260 url = '/cluster/%s/job/%s/clear/'
262 c_error = Job.objects.create(cluster=self.cluster, obj=self.cluster,
263 job_id=1)
264 c_error.info = JOB_ERROR
265 c_error.save()
266 c_error = Job.objects.get(pk=c_error.pk)
267 self.cluster.last_job = c_error
268 self.cluster.ignore_cache = True
269 self.cluster.save()
271 args = (self.cluster.slug, c_error.job_id)
273 # authorized for cluster
274 def tests(user, response):
275 qs = Job.objects.filter(pk=c_error.pk)
276 self.assertFalse(qs)
277 updated = Cluster.objects.filter(pk=self.cluster.pk).values('last_job_id','ignore_cache')[0]
278 self.assertEqual(None, updated['last_job_id'])
279 self.assertFalse(updated['ignore_cache'])
280 self.assert_200(url, args, users=[self.superuser],
281 data={'id':c_error.pk}, tests=tests, method='post',
282 mime='application/json')
284 def test_clear_job_authorized_cluster(self):
286 url = '/cluster/%s/job/%s/clear/'
288 c_error = Job.objects.create(cluster=self.cluster, obj=self.cluster,
289 job_id=1)
290 c_error.info = JOB_ERROR
291 c_error.save()
292 c_error = Job.objects.get(pk=c_error.pk)
293 self.cluster.last_job = c_error
294 self.cluster.ignore_cache = True
295 self.cluster.save()
297 args = (self.cluster.slug, c_error.job_id)
299 # authorized for cluster
300 def tests(user, response):
301 qs = Job.objects.filter(pk=c_error.pk)
302 self.assertFalse(qs)
303 updated = Cluster.objects.filter(pk=self.cluster.pk).values('last_job_id','ignore_cache')[0]
304 self.assertEqual(None, updated['last_job_id'])
305 self.assertFalse(updated['ignore_cache'])
306 self.assert_200(url, args, users=[self.cluster_admin],
307 data={'id':c_error.pk}, tests=tests, method='post',
308 mime='application/json')
310 def test_clear_job_authorized_vm(self):
312 url = '/cluster/%s/job/%s/clear/'
314 # XXX ugh, sorry for this!
315 for user in [self.superuser, self.cluster_admin, self.vm_admin,
316 self.vm_owner]:
318 vm_error = Job.objects.create(cluster=self.cluster, obj=self.vm,
319 job_id=2)
320 vm_error.info = JOB_ERROR
321 vm_error.save()
322 vm_error = Job.objects.get(pk=vm_error.pk)
323 self.vm.last_job = vm_error
324 self.vm.ignore_cache = True
325 self.vm.save()
327 args = (self.cluster.slug, vm_error.job_id)
329 def tests(user, response):
330 qs = Job.objects.filter(pk=vm_error.pk)
331 self.assertFalse(qs.exists(), "job error was not deleted")
332 updated = VirtualMachine.objects.filter(pk=self.vm.pk).values('last_job_id','ignore_cache')[0]
333 self.assertEqual(None, updated['last_job_id'])
334 self.assertFalse(updated['ignore_cache'])
335 self.assert_200(url, args, users=[user], data={'id':vm_error.id},
336 tests=tests, method='post',
337 mime='application/json')
339 def test_job_detail(self):
341 tests viewing job detail
344 c_error = Job.objects.create(cluster=self.cluster, obj=self.cluster,
345 job_id=1)
346 c_error.info = JOB_ERROR
347 c_error.save()
349 url = '/cluster/%s/job/%s/detail/'
350 args = (self.cluster.slug, c_error.job_id)
352 self.assert_standard_fails(url, args, authorized=False)
353 self.assert_200(url, args, users=[self.superuser, self.cluster_admin],
354 template='ganeti/job/detail.html')