3 # Copyright 2009 the Melange authors.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 '"Sverre Rabbelier" <sverre@rabbelier.nl>',
22 '"Lennard de Rijk" <ljvderijk@gmail.com>',
28 from google
.appengine
.ext
import db
29 from google
.appengine
.runtime
import DeadlineExceededError
31 from soc
.cron
import student_proposal_mailer
32 from soc
.models
.job
import Job
34 class Error(Exception):
35 """Base class for all exceptions raised by this module.
40 class FatalJobError(Error
):
41 """Class for all errors that lead to immediate job abortion.
46 class Handler(object):
47 """A handler that dispatches a cron job.
49 The tasks that are mapped into tasks will be called when a worker
50 has claimed the job. However, there is no guarantee as to how long
51 the task will be allowed to run. If an Exception is raised the task
52 is automatically rescheduled for execution.
56 """Constructs a new Handler with all known jobs set.
60 self
.ALREADY_CLAIMED
= 1
66 self
.tasks
['setupStudentProposalMailing'] = \
67 student_proposal_mailer
.setupStudentProposalMailing
68 self
.tasks
['sendStudentProposalMail'] = \
69 student_proposal_mailer
.sendStudentProposalMail
71 def claimJob(self
, job_key
):
72 """A transaction to claim a job.
74 The transaction is rolled back if the status is not 'waiting'.
77 job
= Job
.get_by_id(job_key
)
79 if job
.status
!= 'waiting':
82 job
.status
= 'started'
89 def timeoutJob(self
, job
):
92 If a job has timed out more than 50 times, the job is aborted.
98 job
.status
= 'aborted'
100 job
.status
= 'waiting'
104 job_id
= job
.key().id()
105 logging
.debug("job %d now timeout %d time(s)" % (job_id
, job
.timeouts
))
107 def failJob(self
, job
):
110 If the job has failed more than 5 times, the job is aborted.
116 job
.status
= 'aborted'
118 job
.status
= 'waiting'
122 job_id
= job
.key().id()
123 logging
.warning("job %d now failed %d time(s)" % (job_id
, job
.errors
))
125 def finishJob(self
, job
):
129 job
.status
= 'finished'
132 def abortJob(self
, job
):
136 job
.status
= 'aborted'
139 def handle(self
, job_key
):
142 Returns: one of the following status codes:
143 self.OUT_OF_TIME: returned when a DeadlineExceededError is raised
144 self.ALREADY_CLAIMED: if job.status is not 'waiting'
145 self.SUCCESS: if the job.status has been set to 'succes'
146 self.ABORTED: if the job.status has been set to 'aborted'
147 self.ERRORED: if the job encountered an error
153 job
= db
.run_in_transaction(self
.claimJob
, job_key
)
156 # someone already claimed the job
157 return self
.ALREADY_CLAIMED
159 if job
.task_name
not in self
.tasks
:
160 logging
.error("Unknown job %s" % job
.task_name
)
161 db
.run_in_transaction(self
.abortJob
, job_key
)
164 task
= self
.tasks
[job
.task_name
]
166 # execute the actual job
171 except DeadlineExceededError
, exception
:
174 return self
.OUT_OF_TIME
175 except FatalJobError
, exception
:
176 logging
.exception(exception
)
180 except Exception, exception
:
181 logging
.exception(exception
)
186 def iterate(self
, jobs
, retry_jobs
):
187 """Trivial iterator that iterates over jobs then retry_jobs