Tests should use the downloadable image, not the one in working directory
[ci.git] / hbuild / scheduler.py
blob9f75a91dbd0e17e14f882b543b5a9c2178fcd63e
1 #!/usr/bin/env python3
4 # Copyright (c) 2017 Vojtech Horky
5 # All rights reserved.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
11 # - Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # - Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
16 # - The name of the author may not be used to endorse or promote products
17 # derived from this software without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 import subprocess
32 import os
33 import time
34 import datetime
35 import shutil
36 import sys
38 from threading import Lock, Condition
40 class Task:
41 def __init__(self, report_tag, **report_args):
42 self.report = {
43 'name': report_tag,
44 'result': 'unknown',
45 'attrs': {},
47 for k in report_args:
48 self.report['attrs'][k] = report_args[k]
49 self.ctl = None
51 def execute(self):
52 start_time = time.time()
53 try:
54 res = self.run()
55 if res == False:
56 raise Exception('run() returned False')
57 self.report['result'] = 'ok'
59 self.report['files'] = self.ctl.get_files()
61 end_time = time.time()
62 self.report['attrs']['duration'] = (end_time - start_time) * 1000
64 if res is None:
65 res = {}
67 self.ctl.done()
69 return {
70 'status': 'ok',
71 'data': res
73 except Exception as e:
74 end_time = time.time()
75 self.report['attrs']['duration'] = (end_time - start_time) * 1000
77 self.report['result'] = 'fail'
78 self.report['files'] = []
79 self.ctl.done()
80 raise e
82 def run(self):
83 pass
85 def get_report(self):
86 return self.report
89 class TaskException(Exception):
90 def __init__(self, msg):
91 Exception.__init__(self, msg)
93 class RunCommandException(TaskException):
94 def __init__(self, msg, rc, output):
95 self.rc = rc
96 self.output = output
97 Exception.__init__(self, msg)
99 class TaskController:
100 def __init__(self, name, data, build_directory, artefact_directory, printer, print_debug = False):
101 self.name = name
102 self.data = data
103 self.files = []
104 self.log = None
105 self.log_tail = []
106 self.build_directory = build_directory
107 self.artefact_directory = artefact_directory
108 self.printer = printer
109 self.print_debug_messages = print_debug
111 def derive(self, name, data):
112 return TaskController(name, data, self.build_directory, self.artefact_directory, self.printer, self.print_debug_messages)
114 def dprint(self, str, *args):
115 if self.print_debug_messages:
116 self.printer.print_debug(self.name, str % args)
119 def get_dependency_data(self, dep, key=None):
120 if key is None:
121 return self.get_data(dep)
122 return self.data[dep][key]
124 def get_data(self, key):
125 for dep in self.data:
126 if key in self.data[dep]:
127 return self.data[dep][key]
128 raise TaskException("WARN: unknown key %s" % key)
130 def run_command(self, cmd, cwd=None, needs_output=False):
131 self.dprint("Running `%s'..." % ' '.join(cmd))
132 output = []
133 last_line = ""
134 rc = 0
136 # FIXME: can we keep stdout and stderr separated?
137 with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd) as proc:
138 for line in proc.stdout:
139 line = line.decode('utf-8').strip('\n')
140 self.append_line_to_log_file(line)
141 if needs_output:
142 output.append(line)
143 last_line = line
144 proc.wait()
145 rc = proc.returncode
146 if rc != 0:
147 #self.dprint('stderr=%s' % stderr.strip('\n'))
149 raise RunCommandException(
150 "`%s' failed: %s" % (' '.join(cmd), last_line),
151 rc, output)
153 return {
154 'output': output,
155 'stdout': '\n'.join(output),
156 'stderr': '\n'.join(output),
157 'rc': rc,
158 'failed': not rc == 0
161 def make_temp_dir(self, name):
162 dname = '%s/%s' % ( self.build_directory, name )
163 os.makedirs(dname, exist_ok=True)
164 return os.path.abspath(dname)
166 def recursive_copy(self, src_dir, dest_dir):
167 os.makedirs(dest_dir, exist_ok=True)
168 self.run_command([ 'rsync', '-a', src_dir + '/', dest_dir ])
170 def set_log_file(self, log_filename):
171 # TODO: open the log lazily
172 # TODO: propagate the information to XML report
173 self.log = self.open_downloadable_file('logs/' + log_filename, 'w')
175 def append_line_to_log_file(self, line):
176 if not self.log is None:
177 self.log.write(line + '\n')
178 self.log_tail.append(line)
179 self.log_tail = self.log_tail[-10:]
181 def get_artefact_absolute_path(self, relative_name, create_dirs=False):
182 base = os.path.dirname(relative_name)
183 name = os.path.basename(relative_name)
184 dname = '%s/%s/' % ( self.artefact_directory, base )
186 if create_dirs:
187 os.makedirs(dname, exist_ok=True)
189 return os.path.abspath(dname + name)
191 # TODO: propagate title + download_name to the report
192 def add_downloadable_file(self, title, download_name, current_filename):
193 self.dprint("Downloadable `%s' at %s", title, download_name)
194 self.files.append({
195 'filename' : download_name,
196 'title' : title
199 target = self.get_artefact_absolute_path(download_name, True)
200 shutil.copy(current_filename, target)
202 return target
204 def open_downloadable_file(self, download_name, mode):
205 return open(self.get_artefact_absolute_path(download_name, True), mode)
207 def done(self):
208 if not self.log is None:
209 self.log.close()
211 def get_files(self):
212 return self.files
214 def ret(self, *args, **kwargs):
215 status = 'ok'
216 if len(args) == 1:
217 status = args[0]
218 if status == True:
219 status = 'ok'
220 elif status == False:
221 status = 'fail'
222 result = {
223 'status': status,
224 'data': {}
227 for k in kwargs:
228 result['data'][k] = kwargs[k]
230 return result
234 class TaskWrapper:
235 def __init__(self, id, task, description, deps, mutexes):
236 self.id = id
237 self.dependencies = deps
238 self.description = description
239 self.task = task
240 self.status = 'n/a'
241 self.completed = False
242 self.data = {}
243 self.lock = Lock()
244 self.mutexes = mutexes
246 def has_completed_okay(self):
247 with self.lock:
248 return self.completed and (self.status == 'ok')
250 def has_finished(self):
251 with self.lock:
252 return self.completed
254 def get_status(self):
255 with self.lock:
256 return self.status
258 def get_data(self):
259 with self.lock:
260 return self.data
262 def set_status(self, status, reason):
263 with self.lock:
264 self.status = status
265 self.reason = reason
267 def set_done(self, data):
268 with self.lock:
269 self.data = data
270 self.completed = True
272 def set_skipped(self, reason):
273 self.set_status('skip', reason)
276 class BuildScheduler:
277 def __init__(self, max_workers, build, artefact, build_id, printer, debug = False):
278 self.config = {
279 'build-directory': build,
280 'artefact-directory': artefact,
283 self.printer = printer
285 self.start_timestamp = time.time()
286 self.start_date = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).astimezone().isoformat(' ')
288 # Parent task controller
289 self.ctl = TaskController('scheduler', {}, build, artefact, self.printer, debug)
291 # Start the log file
292 self.report_file = self.ctl.open_downloadable_file('report.xml', 'w')
293 self.report_file.write("<?xml version=\"1.0\"?>\n")
294 self.report_file.write("<build number=\"%s\">\n" % build_id)
296 # The following attributes (up to self.guard declaration) are guarded
297 # by the self.guard mutex and use self.cond to notify about changes
298 # in any of them.
299 # Lower granularity of the locking would be possible but would
300 # complicate too much the conditions inside queue processing where we
301 # need to react to multiple events (new task added vs some task
302 # terminated vs selecting the right task to be executed).
304 # Known tasks (regardless of their state). The mapping is between
305 # a task id and TaskWrapper class.
306 self.tasks = {}
308 # Queue of tasks not yet run (uses task ids only). We insert mutex
309 # tasks at queue beginning (instead of appending) as a heuristic to
310 # prevent accumulation of mutex tasks at the end of run where it could
311 # hurt concurrent execution.
312 self.queue = []
314 # Number of currently running (executing) tasks. Used solely to
315 # control number of concurrently running tasks.
316 self.running_tasks_count = 0
318 # Flag for the queue processing whether to terminate the loop to allow
319 # clean termination of the executor.
320 self.terminate = False
322 # Here we record which mutexes are held by executing tasks. Mutexes are
323 # identified by their (string) name that is used as index. When the
324 # value is True, the mutex is held (i.e. do not run any other task
325 # claming the same mutex), mutex is not held when the value is False
326 # or when the key is not present at all.
327 self.task_mutexes = {}
329 # Condition variable guarding the above attributes.
330 # We initialize CV only without attaching a lock as it creates one
331 # automatically and CV serves as a lock too.
333 # Always use notify_all as we are waiting in multiple functions
334 # for it (e.g. while processing the queue or in barrier).
335 self.guard = Condition()
337 # Lock guarding output synchronization
338 self.output_lock = Lock()
340 # Executor for running of individual tasks
341 from concurrent.futures import ThreadPoolExecutor
342 self.max_workers = max_workers
343 self.executor = ThreadPoolExecutor(max_workers=max_workers + 2)
345 # Start the queue processor
346 self.executor.submit(BuildScheduler.process_queue_wrapper, self)
348 def process_queue_wrapper(self):
350 To allow debugging of the queue processor.
352 try:
353 self.process_queue()
354 except:
355 import traceback
356 traceback.print_exc()
358 def submit(self, description, task_id, task, deps = [], mutexes = []):
359 with self.guard:
360 #print("Submitting {} ({}, {}, {})".format(description, task_id, deps, mutexes))
361 # Check that dependencies are known
362 for d in deps:
363 if not d in self.tasks:
364 raise Exception('Dependency %s is not known.' % d)
365 # Add the wrapper
366 wrapper = TaskWrapper(task_id, task, description, deps, mutexes)
367 self.tasks[task_id] = wrapper
369 # Append to the queue
370 # We use a simple heuristic: if the task has no mutexes, we
371 # append to the end of the queue. Otherwise we prioritize the
372 # task a little bit to prevent ending with serialized execution
373 # of the mutually excluded tasks. (We add before first non-mutexed
374 # task.)
375 if len(mutexes) > 0:
376 new_queue = []
377 inserted = False
378 for q in self.queue:
379 if (len(self.tasks[q].mutexes) == 0) and (not inserted):
380 new_queue.append(task_id)
381 inserted = True
382 new_queue.append(q)
383 if not inserted:
384 new_queue.append(task_id)
385 self.queue = new_queue
386 else:
387 self.queue.append(task_id)
389 self.guard.notify_all()
391 def task_run_wrapper(self, wrapper, task_id, can_be_run):
392 try:
393 self.task_run_inner(wrapper, task_id, can_be_run)
394 except:
395 import traceback
396 traceback.print_exc()
398 def xml_escape_line(self, s):
399 from xml.sax.saxutils import escape
400 import re
402 s_without_ctrl = re.sub(r'[\x00-\x08\x0A-\x1F]', '', s)
403 s_escaped = escape(s_without_ctrl)
404 s_all_entities_encoded = s_escaped.encode('ascii', 'xmlcharrefreplace')
406 return s_all_entities_encoded.decode('utf8')
408 def task_run_inner(self, wrapper, task_id, can_be_run):
409 data = {}
411 if can_be_run:
412 for task_dep_id in wrapper.dependencies:
413 task_dep = self.tasks[task_dep_id]
414 data[task_dep_id] = task_dep.get_data()
416 wrapper.task.ctl = self.ctl.derive(task_id, data)
417 wrapper.task.ctl.set_log_file('%s.log' % task_id)
419 if can_be_run:
420 self.announce_task_started_(wrapper)
422 try:
423 res = wrapper.task.execute()
424 if (res == True) or (res is None):
425 res = {
426 'status': 'ok',
427 'data': {}
429 elif res == False:
430 res = {
431 'status': 'fail',
432 'data': {}
434 reason = None
435 except Exception as e:
436 import traceback
437 res = {
438 'status': 'fail',
439 'data': {}
441 #traceback.print_exc()
442 reason = '%s' % e
443 else:
444 for task_dep_id in wrapper.dependencies:
445 task_dep = self.tasks[task_dep_id]
446 if task_dep.has_finished() and (not task_dep.has_completed_okay()):
447 reason = 'dependency %s failed (or also skipped).' % task_dep_id
448 res = {
449 'status': 'skip',
450 'data': {}
452 wrapper.task.ctl.append_line_to_log_file('Skipped: %s' % reason)
454 status = res['status']
455 report = wrapper.task.get_report()
457 if (not report['name'] is None) and (not self.report_file is None):
458 report_xml = '<' + report['name']
459 report['attrs']['result'] = status
460 for key in report['attrs']:
461 report_xml = report_xml + ' %s="%s"' % (key, report['attrs'][key] )
462 report_xml = report_xml + ' log="logs/%s.log"' % wrapper.id
463 report_xml = report_xml + ">\n"
465 if 'files' in report:
466 for f in report['files']:
467 file = '<file title="%s" filename="%s" />\n' % ( f['title'], f['filename'])
468 report_xml = report_xml + file
470 if (not wrapper.task.ctl is None) and (len(wrapper.task.ctl.log_tail) > 0):
471 report_xml = report_xml + ' <log>\n'
472 for line in wrapper.task.ctl.log_tail:
473 report_xml = report_xml + ' <logline>' + self.xml_escape_line(line) + '</logline>\n'
474 report_xml = report_xml + ' </log>\n'
476 report_xml = report_xml + '</' + report['name'] + ">\n"
479 self.report_file.write(report_xml)
481 wrapper.set_status(status, reason)
482 self.announce_task_finished_(wrapper)
483 wrapper.set_done(res['data'])
485 with self.guard:
486 self.running_tasks_count = self.running_tasks_count - 1
488 if can_be_run:
489 for m in wrapper.mutexes:
490 self.task_mutexes [ m ] = False
492 #print("Task finished, waking up (running now {})".format(self.running_tasks_count))
493 self.guard.notify_all()
496 def process_queue(self):
497 while True:
498 with self.guard:
499 #print("Process queue running, tasks {}".format(len(self.queue)))
500 # Break inside the loop
501 while True:
502 slot_available = self.running_tasks_count < self.max_workers
503 task_available = len(self.queue) > 0
504 #print("Queue: {} (running {})".format(len(self.queue), self.running_tasks_count))
505 if slot_available and task_available:
506 break
507 if self.terminate and (not task_available):
508 return
510 #print("Queue waiting for free slots (running {}) or tasks (have {})".format(self.running_tasks_count, len(self.queue)))
511 self.guard.wait()
512 #print("Guard woken-up after waiting for free slots.")
514 # We have some tasks in the queue and we can run at
515 # least one of them
516 ( ready_task_id, can_be_run ) = self.get_first_ready_task_id_()
517 #print("Ready task is {}".format(ready_task_id))
519 if ready_task_id is None:
520 #print("Queue waiting for new tasks to appear (have {})".format(len(self.queue)))
521 self.guard.wait()
522 #print("Guard woken-up after no ready task.")
523 else:
524 # Remove the task from the queue
525 self.queue.remove(ready_task_id)
527 ready_task = self.tasks[ready_task_id]
529 # Need to update number of running tasks here and now
530 # because the executor might start the execution later
531 # and we would evaluate incorrectly the condition above
532 # that we can start another task.
533 self.running_tasks_count = self.running_tasks_count + 1
535 #print("Ready is {}".format(ready_task))
536 if can_be_run:
537 for m in ready_task.mutexes:
538 self.task_mutexes [ m ] = True
539 #print("Actually starting task {}".format(ready_task_id))
540 self.executor.submit(BuildScheduler.task_run_wrapper,
541 self, ready_task, ready_task_id, can_be_run)
543 def get_first_ready_task_id_(self):
545 Return tuple of first task that can be run (or failed immediately)
546 with note whether the result is predetermined.
547 Returns None when no task can be run.
549 # We assume self.guard was already acquired
550 # We use here the for ... else construct of Python (recall that else
551 # is taken when break is not used)
552 for task_id in self.queue:
553 task = self.tasks[task_id]
554 for task_dep_id in task.dependencies:
555 task_dep = self.tasks[ task_dep_id ]
556 if not task_dep.has_finished():
557 break
558 # Failed dependency means we can return now
559 if task_dep.get_status() != 'ok':
560 return ( task_id, False )
561 else:
562 for task_mutex in task.mutexes:
563 if (task_mutex in self.task_mutexes) and self.task_mutexes[ task_mutex ]:
564 break
565 else:
566 return ( task_id, True )
567 return ( None, None )
569 def announce_task_started_(self, task):
570 self.printer.print_starting(task.description + " ...")
572 def announce_task_finished_(self, task):
573 description = task.description
574 if task.status == 'ok':
575 msg = 'done'
576 msg_color = self.printer.GREEN
577 description = description + '.'
578 elif task.status == 'skip':
579 msg = 'skip'
580 msg_color = self.printer.CYAN
581 description = description + ': ' + task.reason
582 else:
583 msg = 'fail'
584 msg_color = self.printer.RED
585 description = description + ': ' + task.reason
587 self.printer.print_finished(msg_color, msg, description)
590 def barrier(self):
591 with self.guard:
592 #print("Barrier ({}, {})...".format(self.running_tasks_count, len(self.queue)))
593 while (self.running_tasks_count > 0) or (len(self.queue) > 0):
594 #print("Barrier waiting ({}, {})...".format(self.running_tasks_count, len(self.queue)))
595 self.guard.wait()
597 def done(self):
598 with self.guard:
599 self.terminate = True
600 self.guard.notify_all()
601 self.barrier()
602 self.close_report()
603 self.executor.shutdown(True)
605 def close_report(self):
606 if not self.report_file is None:
607 end_time = time.time()
608 self.report_file.write("<buildinfo started=\"{}\" duration=\"{}\" parallelism=\"{}\" />\n".format(
609 self.start_date, ( end_time - self.start_timestamp ) * 1000,
610 self.max_workers
612 self.report_file.write("</build>\n")
613 self.report_file.close()
614 self.report_file = None