Merge branch 'develop' into feature/search_autocomplete_haste
[ganeti_webmgr.git] / ganeti / tests / models / virtual_machine.py
blob1b2aa9ffcd222f8274db950c2c79a0374b0189c5
1 # Copyright (C) 2010 Oregon State University et al.
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
16 # USA.
19 from datetime import datetime
21 from django.contrib.auth.models import User, Group
22 from django.db.utils import IntegrityError
23 from django.test import TestCase
25 from ganeti.tests.rapi_proxy import (RapiProxy, INSTANCE, JOB, JOB_RUNNING,
26 JOB_DELETE_SUCCESS)
27 from ganeti import models, constants
29 VirtualMachine = models.VirtualMachine
30 Cluster = models.Cluster
31 Node = models.Node
32 ClusterUser = models.ClusterUser
33 Job = models.Job
34 SSHKey = models.SSHKey
36 __all__ = (
37 'TestVirtualMachineModel',
38 'VirtualMachineTestCaseMixin',
41 class VirtualMachineTestCaseMixin():
42 def create_virtual_machine(self, cluster=None, hostname='vm1.osuosl.bak'):
43 cluster = cluster if cluster else Cluster(hostname='test.osuosl.bak', slug='OSL_TEST', username='foo', password='bar')
44 cluster.save()
45 cluster.sync_nodes()
46 vm = VirtualMachine(cluster=cluster, hostname=hostname)
47 vm.save()
48 return vm, cluster
51 class TestVirtualMachineModel(TestCase, VirtualMachineTestCaseMixin):
53 def setUp(self):
54 models.client.GanetiRapiClient = RapiProxy
56 def tearDown(self):
57 Job.objects.all().delete()
58 VirtualMachine.objects.all().delete()
59 Node.objects.all().delete()
60 Cluster.objects.all().delete()
61 User.objects.all().delete()
62 Group.objects.all().delete()
63 ClusterUser.objects.all().delete()
65 def test_trivial(self):
66 """
67 Test the test case's setUp().
68 """
70 pass
72 def test_instantiate(self):
73 VirtualMachine()
75 def test_non_trivial(self):
76 """
77 Test instantiating a VirtualMachine with extra parameters
78 """
79 # Define cluster for use
80 vm_hostname='vm.test.org'
81 cluster = Cluster(hostname='test.osuosl.bak', slug='OSL_TEST')
82 cluster.save()
83 owner = ClusterUser(id=32, name='foobar')
85 # Cluster
86 vm = VirtualMachine(cluster=cluster, hostname=vm_hostname)
87 vm.save()
88 self.assertTrue(vm.id)
89 self.assertEqual('vm.test.org', vm.hostname)
90 self.assertFalse(vm.error)
91 vm.delete()
93 # Multiple
94 vm = VirtualMachine(cluster=cluster, hostname=vm_hostname,
95 virtual_cpus=3, ram=512, disk_size=5120,
96 owner=owner)
97 vm.save()
98 self.assertTrue(vm.id)
99 self.assertEqual('vm.test.org', vm.hostname)
100 self.assertEqual(512, vm.ram)
101 self.assertEqual(5120, vm.disk_size)
102 self.assertEqual('foobar', vm.owner.name)
103 self.assertFalse(vm.error)
105 # test unique constraints
106 #vm = VirtualMachine(cluster=cluster, hostname=vm_hostname)
107 #self.assertRaises(IntegrityError, vm.save)
109 def test_save(self):
111 Test saving a VirtualMachine
113 Verify:
114 * VirtualMachine can be saved
115 * VirtualMachine can be loaded
116 * Hash is copied from cluster
118 vm, cluster = self.create_virtual_machine()
119 self.assert_(vm.id)
120 self.assertFalse(vm.error)
121 self.assertEqual(vm.cluster_hash, cluster.hash)
123 vm = VirtualMachine.objects.get(id=vm.id)
124 self.assert_(vm.info)
125 self.assertFalse(vm.error)
127 def test_hash_update(self):
129 When cluster is saved hash for its VirtualMachines should be updated
131 vm0, cluster = self.create_virtual_machine()
132 vm1, cluster = self.create_virtual_machine(cluster, 'test2.osuosl.bak')
134 self.assertEqual(vm0.cluster_hash, cluster.hash)
135 self.assertEqual(vm1.cluster_hash, cluster.hash)
137 # change cluster's hash
138 cluster.hostname = 'SomethingDifferent'
139 cluster.save()
140 vm0 = VirtualMachine.objects.get(pk=vm0.id)
141 vm1 = VirtualMachine.objects.get(pk=vm1.id)
142 self.assertEqual(vm0.cluster_hash, cluster.hash, 'VirtualMachine does not have updated cache')
143 self.assertEqual(vm1.cluster_hash, cluster.hash, 'VirtualMachine does not have updated cache')
145 def test_parse_info(self):
147 Test parsing values from cached info
149 Verifies:
150 * mtime and ctime are parsed
151 * ram, virtual_cpus, and disksize are parsed
153 vm, cluster = self.create_virtual_machine()
154 vm.info = INSTANCE
156 self.assertEqual(vm.ctime, datetime.fromtimestamp(1285799513.4741000))
157 self.assertEqual(vm.mtime, datetime.fromtimestamp(1285883187.8692000))
158 self.assertEqual(vm.ram, 512)
159 self.assertEqual(vm.virtual_cpus, 2)
160 self.assertEqual(vm.disk_size, 5120)
162 def test_update_owner_tag(self):
164 Test changing owner
166 vm, cluster = self.create_virtual_machine()
168 owner0 = ClusterUser(id=74, name='owner0')
169 owner1 = ClusterUser(id=21, name='owner1')
170 owner0.save()
171 owner1.save()
173 # no owner
174 vm.refresh()
175 self.assertEqual([], vm.info['tags'])
177 # setting owner
178 vm.owner = owner0
179 vm.save()
180 self.assertEqual(['%s%s' % (constants.OWNER_TAG, owner0.id)], vm.info['tags'])
182 # changing owner
183 vm.owner = owner1
184 vm.save()
185 self.assertEqual(['%s%s' % (constants.OWNER_TAG, owner1.id)], vm.info['tags'])
187 # setting owner to none
188 vm.owner = None
189 vm.save()
190 self.assertEqual([], vm.info['tags'])
192 def test_start(self):
194 Test VirtualMachine.start()
196 Verifies:
197 * job is created
198 * cache is disabled while job is running
199 * cache is reenabled when job is finished
201 vm, cluster = self.create_virtual_machine()
202 vm.rapi.GetJobStatus.response = JOB_RUNNING
204 # reboot enables ignore_cache flag
205 job_id = vm.startup().id
206 vm = VirtualMachine.objects.get(id=vm.id)
207 self.assert_(Job.objects.filter(id=job_id).exists())
208 self.assert_(vm.ignore_cache)
209 self.assert_(vm.last_job_id)
211 # finished job resets ignore_cache flag
212 vm.rapi.GetJobStatus.response = JOB
213 vm = VirtualMachine.objects.get(id=vm.id)
214 self.assertFalse(vm.ignore_cache)
215 self.assertFalse(vm.last_job_id)
216 self.assert_(Job.objects.get(id=job_id).finished)
218 def test_stop(self):
220 Test VirtualMachine.stop()
222 Verifies:
223 * job is created
224 * cache is disabled while job is running
225 * cache is reenabled when job is finished
227 vm, cluster = self.create_virtual_machine()
228 vm.rapi.GetJobStatus.response = JOB_RUNNING
230 # reboot enables ignore_cache flag
231 job_id = vm.shutdown().id
232 self.assert_(Job.objects.filter(id=job_id).exists())
233 vm = VirtualMachine.objects.get(id=vm.id)
234 self.assert_(vm.ignore_cache)
235 self.assert_(vm.last_job_id)
236 self.assert_(Job.objects.filter(id=job_id).values()[0]['ignore_cache'])
238 # finished job resets ignore_cache flag
239 vm.rapi.GetJobStatus.response = JOB
240 vm = VirtualMachine.objects.get(id=vm.id)
241 self.assertFalse(vm.ignore_cache)
242 self.assertFalse(vm.last_job_id)
243 self.assertFalse(Job.objects.filter(id=job_id).values()[0]['ignore_cache'])
244 self.assert_(Job.objects.get(id=job_id).finished)
246 def test_reboot(self):
248 Test vm.reboot()
250 Verifies:
251 * job is created
252 * cache is disabled while job is running
253 * cache is reenabled when job is finished
255 vm, cluster = self.create_virtual_machine()
256 vm.rapi.GetJobStatus.response = JOB_RUNNING
258 # reboot enables ignore_cache flag
259 job_id = vm.reboot().id
260 self.assert_(Job.objects.filter(id=job_id).exists())
261 vm = VirtualMachine.objects.get(id=vm.id)
262 self.assert_(vm.ignore_cache)
263 self.assert_(vm.last_job_id)
264 self.assert_(Job.objects.filter(id=job_id).values()[0]['ignore_cache'])
266 # finished job resets ignore_cache flag
267 vm.rapi.GetJobStatus.response = JOB
268 self.assert_(Job.objects.filter(id=job_id).exists())
269 vm = VirtualMachine.objects.get(id=vm.id)
270 self.assertFalse(vm.ignore_cache)
271 self.assertFalse(vm.last_job_id)
272 self.assertFalse(Job.objects.filter(id=job_id).values()[0]['ignore_cache'])
273 self.assert_(Job.objects.get(id=job_id).finished)
275 def test_load_pending_delete(self):
277 Tests loading a VM that has a pending delete
279 Verifies:
280 * The job is still running so the VM will be loaded
282 vm, cluster = self.create_virtual_machine()
283 vm.rapi.GetJobStatus.response = JOB_RUNNING
284 vm.refresh()
285 vm.ignore_cache = True
286 vm.pending_delete = True
287 vm.last_job = Job.objects.create(job_id=1, obj=vm, cluster_id=vm.cluster_id)
288 vm.save()
290 # Test loading vm, job is running so it should not be deleted yet
291 vm = VirtualMachine.objects.get(pk=vm.pk)
292 self.assert_(vm.id)
293 self.assert_(vm.pending_delete)
294 self.assertFalse(vm.deleted)
296 def test_load_deleted(self):
298 Tests loading a VM that has a pending delete
300 Verifies:
301 * The Job is finished. It will load the VM but it will be deleted
302 and marked as such.
304 vm, cluster = self.create_virtual_machine()
305 vm.rapi.GetJobStatus.response = JOB_RUNNING
306 vm.refresh()
307 vm.ignore_cache = True
308 vm.pending_delete = True
309 vm.last_job = Job.objects.create(job_id=1, obj=vm, cluster_id=vm.cluster_id)
310 vm.save()
312 # Test loading vm, delete job is finished
313 vm.rapi.GetJobStatus.response = JOB_DELETE_SUCCESS
314 vm = VirtualMachine.objects.get(pk=vm.pk)
315 self.assertFalse(vm.id)
316 self.assert_(vm.pending_delete)
317 self.assert_(vm.deleted)
318 self.assertFalse(VirtualMachine.objects.filter(pk=vm.pk).exists())