Make manager port dynamic, therefore enabling multiple monitors running in parallel.
[smonitor.git] / monitor / datastore.py
blobd637b4e43b568fb0ef3037c1c7c7236d8c9cc3d0
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
4 # Server monitoring system
6 # Copyright © 2011 Rodrigo Eduardo Lazo Paz
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Multi-dimensional Store.
25 Provides the DataStore class, a sparse three-dimensional storage data
26 structure. It stores data points, or vars, identified by name. Each
27 var has multiple timestamped (unix time) values, or `DataPoint`s,
28 grouped together into a `DataEntry`, sorted in descending order,
29 newest value first. For example, var `stock_quote` could have the
30 values (1316516400, 21.1), (1316512800, 21.3), (1316509200, 20.3)
31 representing the stock values during three consecutive hours on
32 09/20/11.
34 Vars belong to a group. Each group can have any number of vars. Using
35 the same var name in multiple groups simulates the concept of column
36 in SQL databases.
38 Data is persistent through Python's Pickle interface, therefore, the
39 three classes in this module, `DataPoint`, `DataEntry` and
40 `DataStore`, must be imported when using the dump/load methods,
41 otherwise, Python may complain about missing class declarations.
42 """
44 __author__ = "rlazo.paz@gmail.com (Rodrigo Lazo)"
45 __version__ = 0.2
48 import bisect
49 from itertools import groupby
50 import cPickle
51 import collections
52 import os
55 class DataPoint(object): # pylint: disable=R0903
56 """Single data point.
58 Stores a value, any type, and the timestamp associated with
59 it. Provides comparison based solely on the timestamp. Any
60 comparison made to non `DataPoint` objects is always -1.
61 """
62 def __init__(self, timestamp, value):
63 self.timestamp = timestamp
64 self.value = value
66 def __cmp__(self, other):
67 if not isinstance(other, DataPoint):
68 return -1
69 return cmp(self.timestamp, other.timestamp)
71 def __str__(self):
72 return "%s@%d" % (str(self.value), self.timestamp)
74 def as_tuple(self):
75 """Returns a tuple (timestmap, value)."""
76 return (self.timestamp, self.value)
79 # TODO: (09/20) compare performance of insert_point and insert_point
80 # using biesct.insort and sort
81 class DataEntry(object):
82 """List of related `DataPoint`s sorted by newest first."""
84 def __init__(self):
85 self._points = []
87 def insert_point(self, timestamp, value):
88 """Inserts a single data point.
90 Arguments:
91 - `timestamp`: Positive integer, unix time associated with value.
92 - `value`: Data to insert.
94 Returns:
95 True if inserted, False otherwise. Only an invalid timestamp
96 will cause the operation to return False.
97 """
98 if not isinstance(timestamp, int) or timestamp < 0:
99 return False
100 bisect.insort(self._points, DataPoint(timestamp, value))
101 return True
103 def insert_points(self, values):
104 """Inserts a list of values.
106 Arguments:
107 - `values`: Iterable, containing pairs of (timestamp,
108 value). `timestamp` must be positive integer, `value` can be
109 any object.
111 Returns:
112 True if all the points in `values` could be correctly
113 inserted, False otherwise. Only an invalid timestamp will
114 cause the operation to return False.
116 flag = True
117 for timestamp, value in values:
118 if not isinstance(timestamp, int) or timestamp < 0:
119 flag = False
120 else:
121 self._points.append(DataPoint(timestamp, value))
122 self._points.sort()
123 return flag
125 def get_latest(self):
126 """Returns the latest, by timestamp, `DataPoint`."""
127 return self._points[-1]
129 def get_all(self):
130 """Returns an interable of all `DataPoints`, sorted by timestmap."""
131 return self._points
133 def get_all_compact(self):
134 """Returns an iterable of distinct `DataPoints`, sorted by timestamp.
136 If consecutives `DataPoints` have the same value, only two are
137 returned, the first and last by timestamp.
139 result = []
140 for _, iterable in groupby(self._points, key=lambda x: x.value):
141 values = list(iterable)
142 result.append(values[0])
143 if len(values) >= 2:
144 result.append(values[-1])
145 return result
147 def get_max_value(self):
148 """Returns the largest value stored."""
149 return max(self._points, key=lambda x: x.value).value
151 def get_min_value(self):
152 """Returns the largest value stored."""
153 return min(self._points, key=lambda x: x.value).value
155 def get_since(self, timestamp):
156 """Builds an iterable of `DataPoints` since `timestamp`.
158 Arguments:
159 - `timestamp`: Positive integer, represents the timestamp of
160 the earliest `DataPoint` to return.
162 Returns:
163 An iterable of sorted, by timestamp, `DataPoints` whose
164 timestamp value is greater or equal to `timestamp` argument.
166 dummy_point = DataPoint(timestamp, None)
167 index = bisect.bisect(self._points, dummy_point)
168 if index > 0 and self._points[index - 1] == dummy_point:
169 index -= 1
170 return self._points[index:]
173 # TODO: (09/20) Make the object thread-safe.
174 class DataStore(object):
175 """Multi-dimensional data store.
177 See file level comments for further information.
180 def __init__(self):
181 self._store = collections.defaultdict(dict)
182 self._vars = set()
183 self._is_snapshot = False
185 def insert(self, group, var, timestamp, value):
186 """Inserts a single data point.
188 Arguments:
189 - `group`: String, point's group name
190 - `var`: String, point's var name
191 - `timestamp`: Positive integer, timestamp associated with
192 this point
193 - `value`: Object, data to store.
195 Returns:
196 True, if value was correctly inserted, False otherwise. Only
197 invalid timestamp values will cause the rejection of an insert.
199 return self._store[group].setdefault(
200 var, DataEntry()).insert_point(timestamp, value)
202 def insert_dict(self, data_dict):
203 """Inserts multiple data points.
205 `data_dict` must be a dictionary of values in the form:
207 {'group': {'var': (timestamp, val), "var": (timestamp, val)}}
209 See `DataStore.insert` for detailed definition of Valid values
210 for each element of the dictionary.
212 Returns:
213 True, if all values were correctly inserted, False
214 otherwise. Only invalid timestamp values will cause the
215 rejection of an insert.
217 result = True
218 for group_name, entry in data_dict.iteritems():
219 for var_name, datapoint in entry.iteritems():
220 result = result and \
221 self.insert(group_name, var_name,
222 datapoint[0], datapoint[1])
223 return result
225 def get_group(self, group):
226 """Lists all vars, and corresponding `DataEntries', for `group`.
228 Returns:
229 A dictionary in the form {'varname': `DataEntry`}, or
230 empty if group does not exist or doesn't contains data.
232 return self._store[group] if group in self._store else {}
234 def get_var(self, var):
235 """Lists all groups, and corresponding `DataEntries`, for `var`.
237 Returns:
238 A dictionary in the form {'hostname': `DataEntry`}, or
239 empty if var does not exist.
241 self._update_vars()
242 sol = {}
243 if var in self._vars:
244 for group in self._store.iterkeys():
245 if var in self._store[group]:
246 sol[group] = self._store[group][var]
247 return sol
249 def list_groups(self):
250 """Returns a list contaning the name of every group store."""
251 return self._store.keys()
253 def list_vars(self):
254 """Returns a list contaning the name of every var store."""
255 self._update_vars()
256 return list(self._vars)
258 def load(self, filename):
259 """Loads data from `filename`.
261 Any internal data stored will be deleted before loading the
262 file.
264 Arguments:
265 - `filename`: String, path to a file created by
266 `DataStore.dump` method.
268 with open(filename, 'rb') as fd:
269 obj = cPickle.load(fd)
270 self._store = obj
271 self._is_snapshot = True
272 self._vars.clear()
274 def dump(self, filename):
275 """Creates an snapshot of this object.
277 Generated file is binary. For a textual representation of the
278 data, see `DataStore.dump_as_text`.
280 Arguments:
281 - `filename`: String, path of the file to create/overwrite.
283 with open(filename, 'wb', 0) as fd:
284 cPickle.dump(self._store, fd, cPickle.HIGHEST_PROTOCOL)
285 fd.flush()
286 os.fsync(fd.fileno())
288 def dump_obj(self):
289 """Creates a snapshot of this objects and returns it as an object."""
290 return cPickle.dumps(self._store)
292 def dump_as_text(self, filename):
293 """Creates a human-readable snapshot of this object.
295 The file created by this method cannot be loaded again. To
296 create an snapshot for data persistency, see `DataStore.dump`.
298 Arguments:
299 - `filename`: String, path of the file to create/overwrite.
301 with open(filename, 'w') as fd:
302 for groupname, varss in self._store.iteritems():
303 for varname, entry in varss.iteritems():
304 points = (str(x.as_tuple()) for x in entry.get_all())
305 fd.write("%s@%s: %s\n" % (varname, groupname,
306 " ".join(points)))
307 fd.flush()
308 os.fsync(fd.fileno())
310 def is_snapshot(self):
311 return self._is_snapshot
313 def _update_vars(self, force=False):
314 """Updates internal _vars cache."""
315 if not self._vars or force:
316 self._vars.update(*[v.keys() for v in self._store.itervalues()])