2 # -*- coding: utf-8 -*-
4 # Server monitoring system
6 # Copyright © 2011 Rodrigo Eduardo Lazo Paz
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Multi-dimensional Store.
25 Provides the DataStore class, a sparse three-dimensional storage data
26 structure. It stores data points, or vars, identified by name. Each
27 var has multiple timestamped (unix time) values, or `DataPoint`s,
28 grouped together into a `DataEntry`, sorted in descending order,
29 newest value first. For example, var `stock_quote` could have the
30 values (1316516400, 21.1), (1316512800, 21.3), (1316509200, 20.3)
31 representing the stock values during three consecutive hours on
34 Vars belong to a group. Each group can have any number of vars. Using
35 the same var name in multiple groups simulates the concept of column
38 Data is persistent through Python's Pickle interface, therefore, the
39 three classes in this module, `DataPoint`, `DataEntry` and
40 `DataStore`, must be imported when using the dump/load methods,
41 otherwise, Python may complain about missing class declarations.
44 __author__
= "rlazo.paz@gmail.com (Rodrigo Lazo)"
49 from itertools
import groupby
55 class DataPoint(object): # pylint: disable=R0903
58 Stores a value, any type, and the timestamp associated with
59 it. Provides comparison based solely on the timestamp. Any
60 comparison made to non `DataPoint` objects is always -1.
62 def __init__(self
, timestamp
, value
):
63 self
.timestamp
= timestamp
66 def __cmp__(self
, other
):
67 if not isinstance(other
, DataPoint
):
69 return cmp(self
.timestamp
, other
.timestamp
)
72 return "%s@%d" % (str(self
.value
), self
.timestamp
)
75 """Returns a tuple (timestmap, value)."""
76 return (self
.timestamp
, self
.value
)
79 # TODO: (09/20) compare performance of insert_point and insert_point
80 # using biesct.insort and sort
81 class DataEntry(object):
82 """List of related `DataPoint`s sorted by newest first."""
87 def insert_point(self
, timestamp
, value
):
88 """Inserts a single data point.
91 - `timestamp`: Positive integer, unix time associated with value.
92 - `value`: Data to insert.
95 True if inserted, False otherwise. Only an invalid timestamp
96 will cause the operation to return False.
98 if not isinstance(timestamp
, int) or timestamp
< 0:
100 bisect
.insort(self
._points
, DataPoint(timestamp
, value
))
103 def insert_points(self
, values
):
104 """Inserts a list of values.
107 - `values`: Iterable, containing pairs of (timestamp,
108 value). `timestamp` must be positive integer, `value` can be
112 True if all the points in `values` could be correctly
113 inserted, False otherwise. Only an invalid timestamp will
114 cause the operation to return False.
117 for timestamp
, value
in values
:
118 if not isinstance(timestamp
, int) or timestamp
< 0:
121 self
._points
.append(DataPoint(timestamp
, value
))
125 def get_latest(self
):
126 """Returns the latest, by timestamp, `DataPoint`."""
127 return self
._points
[-1]
130 """Returns an interable of all `DataPoints`, sorted by timestmap."""
133 def get_all_compact(self
):
134 """Returns an iterable of distinct `DataPoints`, sorted by timestamp.
136 If consecutives `DataPoints` have the same value, only two are
137 returned, the first and last by timestamp.
140 for _
, iterable
in groupby(self
._points
, key
=lambda x
: x
.value
):
141 values
= list(iterable
)
142 result
.append(values
[0])
144 result
.append(values
[-1])
147 def get_max_value(self
):
148 """Returns the largest value stored."""
149 return max(self
._points
, key
=lambda x
: x
.value
).value
151 def get_min_value(self
):
152 """Returns the largest value stored."""
153 return min(self
._points
, key
=lambda x
: x
.value
).value
155 def get_since(self
, timestamp
):
156 """Builds an iterable of `DataPoints` since `timestamp`.
159 - `timestamp`: Positive integer, represents the timestamp of
160 the earliest `DataPoint` to return.
163 An iterable of sorted, by timestamp, `DataPoints` whose
164 timestamp value is greater or equal to `timestamp` argument.
166 dummy_point
= DataPoint(timestamp
, None)
167 index
= bisect
.bisect(self
._points
, dummy_point
)
168 if index
> 0 and self
._points
[index
- 1] == dummy_point
:
170 return self
._points
[index
:]
173 # TODO: (09/20) Make the object thread-safe.
174 class DataStore(object):
175 """Multi-dimensional data store.
177 See file level comments for further information.
181 self
._store
= collections
.defaultdict(dict)
183 self
._is
_snapshot
= False
185 def insert(self
, group
, var
, timestamp
, value
):
186 """Inserts a single data point.
189 - `group`: String, point's group name
190 - `var`: String, point's var name
191 - `timestamp`: Positive integer, timestamp associated with
193 - `value`: Object, data to store.
196 True, if value was correctly inserted, False otherwise. Only
197 invalid timestamp values will cause the rejection of an insert.
199 return self
._store
[group
].setdefault(
200 var
, DataEntry()).insert_point(timestamp
, value
)
202 def insert_dict(self
, data_dict
):
203 """Inserts multiple data points.
205 `data_dict` must be a dictionary of values in the form:
207 {'group': {'var': (timestamp, val), "var": (timestamp, val)}}
209 See `DataStore.insert` for detailed definition of Valid values
210 for each element of the dictionary.
213 True, if all values were correctly inserted, False
214 otherwise. Only invalid timestamp values will cause the
215 rejection of an insert.
218 for group_name
, entry
in data_dict
.iteritems():
219 for var_name
, datapoint
in entry
.iteritems():
220 result
= result
and \
221 self
.insert(group_name
, var_name
,
222 datapoint
[0], datapoint
[1])
225 def get_group(self
, group
):
226 """Lists all vars, and corresponding `DataEntries', for `group`.
229 A dictionary in the form {'varname': `DataEntry`}, or
230 empty if group does not exist or doesn't contains data.
232 return self
._store
[group
] if group
in self
._store
else {}
234 def get_var(self
, var
):
235 """Lists all groups, and corresponding `DataEntries`, for `var`.
238 A dictionary in the form {'hostname': `DataEntry`}, or
239 empty if var does not exist.
243 if var
in self
._vars
:
244 for group
in self
._store
.iterkeys():
245 if var
in self
._store
[group
]:
246 sol
[group
] = self
._store
[group
][var
]
249 def list_groups(self
):
250 """Returns a list contaning the name of every group store."""
251 return self
._store
.keys()
254 """Returns a list contaning the name of every var store."""
256 return list(self
._vars
)
258 def load(self
, filename
):
259 """Loads data from `filename`.
261 Any internal data stored will be deleted before loading the
265 - `filename`: String, path to a file created by
266 `DataStore.dump` method.
268 with
open(filename
, 'rb') as fd
:
269 obj
= cPickle
.load(fd
)
271 self
._is
_snapshot
= True
274 def dump(self
, filename
):
275 """Creates an snapshot of this object.
277 Generated file is binary. For a textual representation of the
278 data, see `DataStore.dump_as_text`.
281 - `filename`: String, path of the file to create/overwrite.
283 with
open(filename
, 'wb', 0) as fd
:
284 cPickle
.dump(self
._store
, fd
, cPickle
.HIGHEST_PROTOCOL
)
286 os
.fsync(fd
.fileno())
289 """Creates a snapshot of this objects and returns it as an object."""
290 return cPickle
.dumps(self
._store
)
292 def dump_as_text(self
, filename
):
293 """Creates a human-readable snapshot of this object.
295 The file created by this method cannot be loaded again. To
296 create an snapshot for data persistency, see `DataStore.dump`.
299 - `filename`: String, path of the file to create/overwrite.
301 with
open(filename
, 'w') as fd
:
302 for groupname
, varss
in self
._store
.iteritems():
303 for varname
, entry
in varss
.iteritems():
304 points
= (str(x
.as_tuple()) for x
in entry
.get_all())
305 fd
.write("%s@%s: %s\n" % (varname
, groupname
,
308 os
.fsync(fd
.fileno())
310 def is_snapshot(self
):
311 return self
._is
_snapshot
313 def _update_vars(self
, force
=False):
314 """Updates internal _vars cache."""
315 if not self
._vars
or force
:
316 self
._vars
.update(*[v
.keys() for v
in self
._store
.itervalues()])