Merge branch 'develop' into feature/search_autocomplete_haste
[ganeti_webmgr.git] / ganeti / cache / node.py
blobf6c8db12fb4b73c386e0c40d2c2688997bf03d15
1 # Copyright (C) 2010 Oregon State University et al.
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
16 # USA.
18 from datetime import datetime
19 import cPickle
21 from twisted.internet.defer import DeferredList, Deferred
22 from twisted.internet import reactor
23 from twisted.web import client
24 from django.utils import simplejson
25 from ganeti.cache import Timer, Counter
27 from ganeti.models import Cluster, Node, VirtualMachine
30 NODES_URL = 'https://%s:%s/2/nodes?bulk=1'
33 class NodeCacheUpdater(object):
34 """
35 Updates the cache for all all Nodes in all clusters. This method
36 processes the data in bulk, where possible, to reduce runtime. Generally
37 this should be faster than refreshing individual Nodes.
38 """
40 def update(self):
41 """ start the update process """
42 self.timer = Timer()
43 print '------[node cache update]-------------------------------'
44 clusters = Cluster.objects.all()
45 deferreds = [self.get_cluster_info(cluster) for cluster in clusters]
46 deferred_list = DeferredList(deferreds)
47 deferred_list.addCallback(self.complete)
48 return deferred_list
50 def get_cluster_info(self, cluster):
51 """
52 fetch cluster info from ganeti
53 """
54 deferred = Deferred()
55 d = client.getPage(str(NODES_URL % (cluster.hostname, cluster.port)))
56 d.addCallback(self.process_cluster_info, cluster, deferred.callback)
57 return deferred
59 def process_cluster_info(self, json, cluster, callback):
60 """
61 process data received from ganeti.
62 """
63 print '%s:' % cluster.hostname
64 infos = simplejson.loads(json)
65 self.timer.tick('info fetched from ganeti ')
66 updated = Counter()
67 base = cluster.nodes.all()
68 mtimes = base.values_list('hostname', 'id', 'mtime')
70 data = {}
71 for hostname, id, mtime in mtimes:
72 data[hostname] = (id, float(mtime) if mtime else None)
73 self.timer.tick('mtimes fetched from db ')
75 deferreds = [self.update_node(cluster, info, data, updated) for info in infos]
76 deferred_list = DeferredList(deferreds)
78 # batch update the cache updated time for all Nodes in this cluster. This
79 # will set the last updated time for both Nodes that were modified and for
80 # those that weren't. even if it wasn't modified we want the last
81 # updated time to be up to date.
83 # XXX don't bother checking to see whether this query needs to run. With
84 # normal usage it will almost always need to
85 def update_timestamps(result):
86 print ' updated: %s out of %s' % (updated, len(infos))
87 base.update(cached=datetime.now())
88 self.timer.tick('records or timestamps updated')
89 deferred_list.addCallback(update_timestamps)
90 deferred_list.addCallback(callback)
92 return deferred_list
94 def update_node(self, cluster, info, data, updated):
95 """
96 updates an individual node: this just sets up the work in a deferred
97 by using callLater. Actual work is done in _update_node().
99 @param cluster - cluster this node is on
100 @param info - info from ganeti
101 @param data - data from database
102 @param updated - counter object
103 @return Deferred chained to _update_node() call
105 deferred = Deferred()
106 args = (cluster, info, data, updated, deferred.callback)
107 reactor.callLater(0, self._update_node, *args)
108 return deferred
110 def _update_node(self, cluster, info, data, updated, callback):
112 updates an individual node, this is the actual work function
114 @param cluster - cluster this node is on
115 @param info - info from ganeti
116 @param data - data from database
117 @param updated - counter object
118 @param callback - callback fired when method is complete.
120 hostname = info['name']
121 if hostname in data:
122 id, mtime = data[hostname]
123 if not mtime or mtime < info['mtime']:
124 print ' Node (updated) : %s' % hostname
125 #print ' %s :: %s' % (mtime, datetime.fromtimestamp(info['mtime']))
126 # only update the whole object if it is new or modified.
127 parsed = Node.parse_persistent_info(info)
128 Node.objects.filter(pk=id) \
129 .update(serialized_info=cPickle.dumps(info), **parsed)
130 updated += 1
131 else:
132 # new node
133 node = Node(cluster=cluster, hostname=info['name'])
134 node.info = info
135 node.save()
136 id = node.pk
139 # Updates relationships between a Node and its Primary and Secondary
140 # VirtualMachines. This always runs even when there are no updates but
141 # it should execute quickly since it runs against an indexed column
143 # XXX this blocks so it may be worthwhile to spin this off into a
144 # deferred just to break up this method.
145 VirtualMachine.objects \
146 .filter(hostname__in=info['pinst_list']) \
147 .update(primary_node=id)
149 VirtualMachine.objects \
150 .filter(hostname__in=info['sinst_list']) \
151 .update(secondary_node=id)
153 callback(id)
155 def complete(self, result):
156 """ callback fired when everything is complete """
157 self.timer.stop()