Ticket #6465 - Cache updater: Error processing virtual_machine
[ganeti_webmgr.git] / ganeti_web / cache / virtual_machine.py
blob641e65023765815ad239fedd71551937c0ca71b6
1 # Copyright (C) 2010 Oregon State University et al.
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
16 # USA.
18 import cPickle
19 from datetime import datetime
21 from django.utils import simplejson
22 from twisted.internet import reactor
23 from twisted.internet.defer import DeferredList, Deferred
24 from twisted.web import client
25 from ganeti_web.cache import Timer, Counter
26 from ganeti_web.models import Cluster, VirtualMachine
29 VMS_URL = 'https://%s:%s/2/instances?bulk=1'
32 class VirtualMachineCacheUpdater(object):
34 def update(self):
35 """
36 Updates the cache for all all VirtualMachines in all clusters. This method
37 processes the data in bulk, where possible, to reduce runtime. Generally
38 this should be faster than refreshing individual VirtualMachines.
39 """
40 self.timer = Timer()
41 print '------[vm cache update]-------------------------------'
42 clusters = Cluster.objects.all()
43 deferreds = [self.get_cluster_info(cluster) for cluster in clusters]
44 deferred_list = DeferredList(deferreds)
45 deferred_list.addCallback(self.complete)
47 return deferred_list
49 def get_cluster_info(self, cluster):
50 """
51 fetch cluster info from ganeti
52 """
53 deferred = Deferred()
54 d = client.getPage(str(VMS_URL % (cluster.hostname, cluster.port)))
55 d.addCallback(self.process_cluster_info, cluster, deferred.callback)
56 return deferred
58 def process_cluster_info(self, json, cluster, callback):
59 """
60 process data received from ganeti.
61 """
62 print '%s:' % cluster.hostname
63 infos = simplejson.loads(json)
64 self.timer.tick('info fetched from ganeti ')
65 updated = Counter()
66 base = cluster.virtual_machines.all()
67 mtimes = base.values_list('hostname', 'id', 'mtime', 'status')
69 data = {}
70 for name, id, mtime, status in mtimes:
71 data[name] = (id, float(mtime) if mtime else None, status)
72 self.timer.tick('mtimes fetched from db ')
74 deferreds = [self.update_vm(cluster, info, data, updated) for info in infos]
75 deferred_list = DeferredList(deferreds)
77 # batch update the cache updated time for all VMs in this cluster. This
78 # will set the last updated time for both VMs that were modified and for
79 # those that weren't. even if it wasn't modified we want the last
80 # updated time to be up to date.
82 # XXX don't bother checking to see whether this query needs to run. It
83 # normal usage it will almost always need to
84 def update_timestamps(result):
85 print ' updated: %s out of %s' % (updated, len(infos))
86 base.update(cached=datetime.now())
87 self.timer.tick('records or timestamps updated')
88 deferred_list.addCallback(update_timestamps)
90 # XXX it would be nice if the deferred list could be returned and this
91 # callback hooked up outside of the method, but that doesn't seem
92 # possible
93 deferred_list.addCallback(callback)
95 def update_vm(self, cluster, info, data, updated):
96 """
97 updates an individual VirtualMachine: this just sets up the work in a
98 deferred by using callLater. Actual work is done in _update_vm().
100 @param cluster - cluster this node is on
101 @param info - info from ganeti
102 @param data - data from database
103 @param updated - counter object
104 @return Deferred chained to _update_node() call
106 deferred = Deferred()
107 args = (cluster, info, data, updated, deferred.callback)
108 reactor.callLater(0, self._update_vm, *args)
109 return deferred
111 def _update_vm(self, cluster, info, data, updated, callback):
113 updates an individual VirtualMachine, this is the actual work function
115 @param cluster - cluster this node is on
116 @param info - info from ganeti
117 @param data - data from database
118 @param updated - counter object
119 @param callback - callback fired when method is complete.
121 name = info['name']
122 if name in data:
123 id, mtime, status = data[name]
124 if not mtime or mtime < info['mtime'] \
125 or status != info['status']:
126 print ' Virtual Machine (updated) : %s' % name
127 #print ' %s :: %s' % (mtime, datetime.fromtimestamp(info['mtime']))
128 # only update the whole object if it is new or modified.
130 # XXX status changes will not always be reflected in mtime
131 # explicitly check status to see if it has changed. failing
132 # to check this would result in state changes being lost
133 parsed = VirtualMachine.parse_persistent_info(info)
134 VirtualMachine.objects.filter(pk=id) \
135 .update(serialized_info=cPickle.dumps(info), **parsed)
136 updated += 1
137 else:
138 # new vm
139 vm = VirtualMachine(cluster=cluster, hostname=info['name'])
140 vm.info = info
141 vm.save()
142 id = vm.id
143 updated += 1
145 callback(id)
147 def complete(self, result):
148 """ callback fired when everything is complete """
149 self.timer.stop()