[FIX] Problem loading livesync
[cds-indico.git] / indico / ext / livesync / agent.py
blobf8e0403a718a498645980207b046f0284815e496
1 # -*- coding: utf-8 -*-
2 ##
3 ##
4 ## This file is part of CDS Indico.
5 ## Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007 CERN.
6 ##
7 ## CDS Indico is free software; you can redistribute it and/or
8 ## modify it under the terms of the GNU General Public License as
9 ## published by the Free Software Foundation; either version 2 of the
10 ## License, or (at your option) any later version.
12 ## CDS Indico is distributed in the hope that it will be useful, but
13 ## WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 ## General Public License for more details.
17 ## You should have received a copy of the GNU General Public License
18 ## along with CDS Indico; if not, write to the Free Software Foundation, Inc.,
19 ## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
21 """
22 Module containing the persistent classes that will be stored in the DB
23 """
24 # standard lib imports
25 import datetime
27 # dependency libs
28 import zope.interface
29 from persistent import Persistent, mapping
31 # indico extpoint imports
32 from indico.core.extpoint import Component
33 from indico.util.fossilize import IFossil, fossilizes, Fossilizable, conversion
35 # plugin imports
36 from indico.ext.livesync.struct import SetMultiPointerTrack
37 from indico.ext.livesync.util import getPluginType
38 from indico.ext.livesync.struct import EmptyTrackException
39 from indico.ext.livesync.base import ILiveSyncAgentProvider, MPT_GRANULARITY
40 from indico.ext.livesync.db import updateDBStructures
42 # legacy indico imports
43 from MaKaC import conference
44 from MaKaC.common import DBMgr
46 class QueryException(Exception):
47 """
48 Raised by problems in AgentManager queries
49 """
52 class AgentExecutionException(Exception):
53 """
54 Raised by problems in Agent execution
55 """
58 class IAgentFossil(IFossil):
60 def getId(self):
61 pass
63 def getName(self):
64 pass
66 def isActive(self):
67 pass
69 def getDescription(self):
70 pass
72 def getLastTS(self):
73 pass
75 def getLastDT(self):
76 pass
77 getLastDT.convert = conversion.Conversion.datetime
79 def getExtraOptions(self):
80 pass
81 getExtraOptions.name = 'specific'
84 class SyncAgent(Fossilizable, Persistent):
85 """
86 Represents an "agent" (service)
87 """
89 fossilizes(IAgentFossil)
91 _extraOptions = {}
93 # TODO: Subclass into PushSyncAgent(task)/PullSyncAgent?
95 def __init__(self, aid, name, description, updateTime, access=None):
96 self._id = aid
97 self._name = name
98 self._description = description
99 self._updateTime = updateTime
100 self._manager = None
101 self._active = False
102 self._recording = False
103 self._access = access
105 def record_str(self, (obj, objId, status)):
107 Translates the objects/states to an easy to read textual representation
110 def setManager(self, manager):
111 self._manager = manager
113 def isActive(self):
114 return self._active
116 def isRecording(self):
117 return self._recording
119 def setActive(self, value):
120 self._active = value
122 def preActivate(self, ts):
123 track = self._manager.getTrack()
125 try:
126 track.movePointer(self._id, ts / \
127 self._manager.getGranularity() - 1)
128 except EmptyTrackException:
129 # if the track is empty, don't bother doing this
130 pass
132 # this means everything from the pointer to present will be considered
133 # when the next update is done
134 self._recording = True
136 def getId(self):
137 return self._id
139 def getLastTS(self):
140 return self._manager.getTrack().getPointerTimestamp(self._id)
142 def getLastDT(self):
143 ts = self.getLastTS()
144 return datetime.datetime.utcfromtimestamp(ts * \
145 self._manager.getGranularity()) if ts else None
147 def getName(self):
148 return self._name
150 def getDescription(self):
151 return self._description
153 def getExtraOptions(self):
154 return dict((option, self.getExtraOption(option))
155 for option in self._extraOptions)
157 def getExtraOption(self, optionName):
158 if optionName in self._extraOptions:
159 return getattr(self, "_%s" % optionName)
160 else:
161 raise Exception('unknown option!')
163 def setExtraOption(self, optionName, value):
164 if optionName in self._extraOptions:
165 setattr(self, "_%s" % optionName, value)
166 else:
167 raise Exception('unknown option!')
169 def setParameters(self, description=None,
170 name=None):
171 if description:
172 self._description = description
173 if name:
174 self._name = name
177 class AgentProviderComponent(Component):
179 This class only serves the purpose of letting LiveSync know that an
180 agent type exists
183 zope.interface.implements(ILiveSyncAgentProvider)
185 # ILiveSyncAgentProvider
186 def providesLiveSyncAgentType(self, obj, types):
187 if hasattr(self, '_agentType'):
188 types[self._agentType.__name__] = self._agentType
191 class PushSyncAgent(SyncAgent):
193 PushSyncAgents are agents that actively send data to remote services,
194 instead of waiting to be queried.
197 # Should specify which worker will be used
198 _workerClass = None
200 def __init__(self, aid, name, description, updateTime, access=None):
202 :param aid: agent ID
203 :param name: agent name
204 :param description: a description of the agent
205 :param access: an Indico user/group that has equivalent access
207 super(PushSyncAgent, self).__init__(aid, name, description, updateTime)
208 self._lastTry = None
209 self._access = access
211 def _run(self, data, logger=None, monitor=None, dbi=None):
213 Overloaded - will contain the specific agent code
215 raise Exception("Undefined method")
217 def _generateRecords(self, data, lastTS, dbi=None):
219 :param data: iterable containing data to be converted
220 :param lastTS:
222 Takes the raw data (i.e. "event created", etc) and transforms
223 it into a sequence of 'record/action' pairs.
225 Basically, this function reduces actions to remove server "commands"
227 i.e. ``modified 1234, deleted 1234`` becomes just ``delete 1234``
229 Overloaded by agents
232 def run(self, currentTS, logger=None, monitor=None, dbi=None):
234 Main method, called when agent needs to be run
237 if currentTS == None:
238 till = None
239 else:
240 till = currentTS / self._manager.getGranularity() - 1
242 if not self._manager:
243 raise AgentExecutionException("SyncAgent '%s' has no manager!" % \
244 self._id)
246 if logger:
247 logger.info("Querying agent %s for events till %s" % \
248 (self.getId(), till))
250 # query till currentTS - 1, for integrity reasons
251 data = self._manager.query(agentId=self.getId(),
252 till=till)
254 try:
255 records = self._generateRecords(data, till, dbi=dbi)
256 # run agent-specific cycle
257 result = self._run(records, logger=logger, monitor=monitor, dbi=dbi)
258 except:
259 if logger:
260 logger.exception("Problem running agent %s" % self.getId())
261 return None
263 if result != None:
264 self._lastTry = till
265 return self._lastTry
266 else:
267 return None
269 def acknowledge(self):
271 Called to signal that the information has been correctly processed
272 (usually called by periodic task)
274 self._manager.advance(self.getId(), self._lastTry)
277 class SyncManager(Persistent):
279 Stores live sync configuration parameters and "agents". It is basically an
280 "Agent Manager"
283 def __init__(self, granularity=MPT_GRANULARITY):
285 :param granularity: integer, number of seconds per MPT entry
287 self._granularity = granularity
288 self.reset()
290 def getGranularity(self):
292 Returns the granularity that is set for the MPT
294 return self._granularity
296 @classmethod
297 def getDBInstance(cls):
299 Returns the instance of SyncManager currently in the DB
301 storage = getPluginType().getStorage()
302 if 'agent_manager' in storage:
303 return storage['agent_manager']
304 else:
305 root = DBMgr.getInstance().getDBConnection()
306 updateDBStructures(root)
308 def reset(self, agentsOnly=False, trackOnly=False):
310 Resets database structures
312 .. WARNING::
313 This erases any agents and contents in the MPT
315 if not trackOnly:
316 self._agents = mapping.PersistentMapping()
317 if not agentsOnly:
318 self._track = SetMultiPointerTrack()
320 def registerNewAgent(self, agent):
322 Registers the agent, placing it in a mapping structure
324 self._agents[agent.getId()] = agent
326 # create a new pointer in the track
327 self._track.addPointer(agent.getId())
329 # impose myself as its manager
330 agent.setManager(self)
332 def removeAgent(self, agent):
334 Removes an agent
336 self._track.removePointer(agent.getId())
337 del self._agents[agent.getId()]
339 def query(self, agentId=None, till=None):
341 Queries the agent for a given timespan
344 # TODO: Add more criteria! (for now this will do)
346 if agentId == None:
347 raise QueryException("No criteria specified!")
349 return self._track.pointerIterItems(agentId, till=till)
351 def advance(self, agentId, newLastTS):
353 Advances the agent "pointer" to the specified timestamp
355 self._track.movePointer(agentId,
356 newLastTS)
358 def add(self, timestamp, action):
360 Adds a specific action to the specified timestamp
362 self._track.add(timestamp / self._granularity, action)
364 def getTrack(self):
366 Rerturns the MPT
368 return self._track
370 def getAllAgents(self):
372 Returns the agent dictionary
374 return self._agents
376 def objectExcluded(self, obj):
378 Decides whether a particular object should be ignored or not
380 excluded = getPluginType().getOption('excludedCategories').getValue()
381 if isinstance(obj, conference.Category):
382 return obj.getId() in excluded
383 elif isinstance(obj, conference.Conference):
384 owner = obj.getOwner()
385 if owner:
386 return owner.getId() in excluded
387 elif obj.getParent():
388 conf = obj.getConference()
389 if conf:
390 owner = conf.getOwner()
391 if owner:
392 return owner.getId() in excluded
394 return False
397 class RecordUploader(object):
399 Encapsulates record uploading behavior.
402 DEFAULT_BATCH_SIZE, DEFAULT_NUM_SLAVES = 1000, 2
404 def __init__(self, logger, agent, batchSize=DEFAULT_BATCH_SIZE):
405 self._logger = logger
406 self._agent = agent
407 self._batchSize = batchSize
409 def _uploadBatch(self, batch):
411 :param batch: list of records
413 To be overloaded by uploaders. Does the actual upload.
415 raise Exception("Unimplemented method!")
417 def iterateOver(self, iterator, dbi=None):
419 Consumes an iterator, uploading the records that are returned
420 `dbi` can be passed, so that the cache is cleared once in a while
423 currentBatch = []
425 # take operations and choose which records to send
426 for record in iterator:
428 if len(currentBatch) > (self._batchSize - 1):
429 self._uploadBatch(currentBatch)
430 currentBatch = []
432 currentBatch.append(record)
434 if dbi:
435 dbi.abort()
437 if currentBatch:
438 self._uploadBatch(currentBatch)
440 return True