Remove unused variables
[ci.git] / hbuild / scheduler.py
blob1186e975259fac16463bc66d70e7bf3eb9504874
1 #!/usr/bin/env python3
4 # Copyright (c) 2017 Vojtech Horky
5 # All rights reserved.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
11 # - Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # - Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
16 # - The name of the author may not be used to endorse or promote products
17 # derived from this software without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 import subprocess
32 import os
33 import time
34 import datetime
35 import shutil
36 import sys
38 from threading import Lock, Condition
40 class Task:
41 def __init__(self, report_tag, **report_args):
42 self.report = {
43 'name': report_tag,
44 'result': 'unknown',
45 'attrs': {},
47 for k in report_args:
48 self.report['attrs'][k] = report_args[k]
49 self.ctl = None
51 def execute(self):
52 start_time = time.time()
53 try:
54 res = self.run()
55 if res == False:
56 raise Exception('run() returned False')
57 self.report['result'] = 'ok'
59 self.report['files'] = self.ctl.get_files()
61 end_time = time.time()
62 self.report['attrs']['duration'] = (end_time - start_time) * 1000
64 if res is None:
65 res = {}
67 self.ctl.done()
69 return {
70 'status': 'ok',
71 'data': res
73 except Exception as e:
74 end_time = time.time()
75 self.report['attrs']['duration'] = (end_time - start_time) * 1000
77 self.report['result'] = 'fail'
78 self.report['files'] = self.ctl.get_files()
79 self.ctl.done()
80 raise e
82 def run(self):
83 pass
85 def get_report(self):
86 return self.report
89 class TaskException(Exception):
90 def __init__(self, msg):
91 Exception.__init__(self, msg)
93 class RunCommandException(TaskException):
94 def __init__(self, msg, rc, output):
95 self.rc = rc
96 self.output = output
97 Exception.__init__(self, msg)
99 class TaskController:
100 def __init__(self, name, data, build_directory, artefact_directory, printer, kept_log_lines, print_debug = False):
101 self.name = name
102 self.data = data
103 self.files = []
104 self.log = None
105 self.log_tail = []
106 self.build_directory = build_directory
107 self.artefact_directory = artefact_directory
108 self.printer = printer
109 self.kept_log_lines = kept_log_lines
110 self.print_debug_messages = print_debug
112 def derive(self, name, data):
113 return TaskController(name, data, self.build_directory, self.artefact_directory,
114 self.printer, self.kept_log_lines, self.print_debug_messages)
116 def dprint(self, str, *args):
117 if self.print_debug_messages:
118 self.printer.print_debug(self.name, str % args)
121 def get_dependency_data(self, dep, key=None):
122 if key is None:
123 return self.get_data(dep)
124 return self.data[dep][key]
126 def get_data(self, key):
127 for dep in self.data:
128 if key in self.data[dep]:
129 return self.data[dep][key]
130 raise TaskException("WARN: unknown key %s" % key)
132 def run_command(self, cmd, cwd=None, needs_output=False):
133 self.dprint("Running `%s'..." % ' '.join(cmd))
134 output = []
135 last_line = ""
136 rc = 0
138 # FIXME: can we keep stdout and stderr separated?
139 with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd) as proc:
140 for line in proc.stdout:
141 line = line.decode('utf-8').strip('\n')
142 self.append_line_to_log_file(line)
143 if needs_output:
144 output.append(line)
145 last_line = line
146 proc.wait()
147 rc = proc.returncode
148 if rc != 0:
149 #self.dprint('stderr=%s' % stderr.strip('\n'))
151 raise RunCommandException(
152 "`%s' failed: %s" % (' '.join(cmd), last_line),
153 rc, output)
155 return {
156 'output': output,
157 'stdout': '\n'.join(output),
158 'stderr': '\n'.join(output),
159 'rc': rc,
160 'failed': not rc == 0
163 def make_temp_dir(self, name):
164 dname = '%s/%s' % ( self.build_directory, name )
165 os.makedirs(dname, exist_ok=True)
166 return os.path.abspath(dname)
168 def recursive_copy(self, src_dir, dest_dir):
169 os.makedirs(dest_dir, exist_ok=True)
170 self.run_command([ 'rsync', '-a', src_dir + '/', dest_dir ])
172 def remove_silently(self, path):
173 try:
174 os.remove(path)
175 except OSError as ex:
176 pass
178 def set_log_file(self, log_filename):
179 # TODO: open the log lazily
180 # TODO: propagate the information to XML report
181 self.log = self.open_downloadable_file('logs/' + log_filename, 'w')
183 def append_line_to_log_file(self, line):
184 if not self.log is None:
185 self.log.write(line + '\n')
186 self.log_tail.append(line)
187 self.log_tail = self.log_tail[-self.kept_log_lines:]
189 def get_artefact_absolute_path(self, relative_name, create_dirs=False):
190 base = os.path.dirname(relative_name)
191 name = os.path.basename(relative_name)
192 dname = '%s/%s/' % ( self.artefact_directory, base )
194 if create_dirs:
195 os.makedirs(dname, exist_ok=True)
197 return os.path.abspath(dname + name)
199 # TODO: propagate title + download_name to the report
200 def add_downloadable_file(self, title, download_name, current_filename):
201 self.dprint("Downloadable `%s' at %s", title, download_name)
202 self.files.append({
203 'filename' : download_name,
204 'title' : title
207 target = self.get_artefact_absolute_path(download_name, True)
208 shutil.copy(current_filename, target)
210 return target
212 def move_dir_to_downloadable(self, title, download_name, current_dirname):
213 self.dprint("Downloadable `%s' at %s", title, download_name)
214 self.files.append({
215 'filename' : download_name,
216 'title' : title
219 target = self.get_artefact_absolute_path(download_name, True)
220 shutil.move(current_dirname, target)
222 return target
224 def open_downloadable_file(self, download_name, mode):
225 return open(self.get_artefact_absolute_path(download_name, True), mode)
227 def done(self):
228 if not self.log is None:
229 self.log.close()
231 def get_files(self):
232 return self.files
234 def ret(self, *args, **kwargs):
235 status = 'ok'
236 if len(args) == 1:
237 status = args[0]
238 if status == True:
239 status = 'ok'
240 elif status == False:
241 status = 'fail'
242 result = {
243 'status': status,
244 'data': {}
247 for k in kwargs:
248 result['data'][k] = kwargs[k]
250 return result
254 class TaskWrapper:
255 def __init__(self, id, task, description, deps, mutexes):
256 self.id = id
257 self.dependencies = deps
258 self.description = description
259 self.task = task
260 self.status = 'n/a'
261 self.completed = False
262 self.data = {}
263 self.lock = Lock()
264 self.mutexes = mutexes
266 def has_completed_okay(self):
267 with self.lock:
268 return self.completed and (self.status == 'ok')
270 def has_finished(self):
271 with self.lock:
272 return self.completed
274 def get_status(self):
275 with self.lock:
276 return self.status
278 def get_data(self):
279 with self.lock:
280 return self.data
282 def set_status(self, status, reason):
283 with self.lock:
284 self.status = status
285 self.reason = reason
287 def set_done(self, data):
288 with self.lock:
289 self.data = data
290 self.completed = True
292 def set_skipped(self, reason):
293 self.set_status('skip', reason)
296 class BuildScheduler:
297 def __init__(self, max_workers, build, artefact, build_id, printer, inline_log_lines = 10, debug = False):
298 self.config = {
299 'build-directory': build,
300 'artefact-directory': artefact,
303 self.printer = printer
305 self.start_timestamp = time.time()
306 self.start_date = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).astimezone().isoformat(' ')
308 # Parent task controller
309 self.ctl = TaskController('scheduler', {}, build, artefact, self.printer, inline_log_lines, debug)
311 # Start the log file
312 self.report_file = self.ctl.open_downloadable_file('report.xml', 'w')
313 self.report_file.write("<?xml version=\"1.0\"?>\n")
314 self.report_file.write("<build number=\"%s\">\n" % build_id)
316 # The following attributes (up to self.guard declaration) are guarded
317 # by the self.guard mutex and use self.cond to notify about changes
318 # in any of them.
319 # Lower granularity of the locking would be possible but would
320 # complicate too much the conditions inside queue processing where we
321 # need to react to multiple events (new task added vs some task
322 # terminated vs selecting the right task to be executed).
324 # Known tasks (regardless of their state). The mapping is between
325 # a task id and TaskWrapper class.
326 self.tasks = {}
328 # Queue of tasks not yet run (uses task ids only). We insert mutex
329 # tasks at queue beginning (instead of appending) as a heuristic to
330 # prevent accumulation of mutex tasks at the end of run where it could
331 # hurt concurrent execution.
332 self.queue = []
334 # Number of currently running (executing) tasks. Used solely to
335 # control number of concurrently running tasks.
336 self.running_tasks_count = 0
338 # Flag for the queue processing whether to terminate the loop to allow
339 # clean termination of the executor.
340 self.terminate = False
342 # Here we record which mutexes are held by executing tasks. Mutexes are
343 # identified by their (string) name that is used as index. When the
344 # value is True, the mutex is held (i.e. do not run any other task
345 # claming the same mutex), mutex is not held when the value is False
346 # or when the key is not present at all.
347 self.task_mutexes = {}
349 # Condition variable guarding the above attributes.
350 # We initialize CV only without attaching a lock as it creates one
351 # automatically and CV serves as a lock too.
353 # Always use notify_all as we are waiting in multiple functions
354 # for it (e.g. while processing the queue or in barrier).
355 self.guard = Condition()
357 # Lock guarding output synchronization
358 self.output_lock = Lock()
360 # Executor for running of individual tasks
361 from concurrent.futures import ThreadPoolExecutor
362 self.max_workers = max_workers
363 self.executor = ThreadPoolExecutor(max_workers=max_workers + 2)
365 # Start the queue processor
366 self.executor.submit(BuildScheduler.process_queue_wrapper, self)
368 def process_queue_wrapper(self):
370 To allow debugging of the queue processor.
372 try:
373 self.process_queue()
374 except:
375 import traceback
376 traceback.print_exc()
378 def submit(self, description, task_id, task, deps = [], mutexes = []):
379 with self.guard:
380 #print("Submitting {} ({}, {}, {})".format(description, task_id, deps, mutexes))
381 # Check that dependencies are known
382 for d in deps:
383 if not d in self.tasks:
384 raise Exception('Dependency %s is not known.' % d)
385 # Add the wrapper
386 wrapper = TaskWrapper(task_id, task, description, deps, mutexes)
387 self.tasks[task_id] = wrapper
389 # Append to the queue
390 # We use a simple heuristic: if the task has no mutexes, we
391 # append to the end of the queue. Otherwise we prioritize the
392 # task a little bit to prevent ending with serialized execution
393 # of the mutually excluded tasks. (We add before first non-mutexed
394 # task.)
395 if len(mutexes) > 0:
396 new_queue = []
397 inserted = False
398 for q in self.queue:
399 if (len(self.tasks[q].mutexes) == 0) and (not inserted):
400 new_queue.append(task_id)
401 inserted = True
402 new_queue.append(q)
403 if not inserted:
404 new_queue.append(task_id)
405 self.queue = new_queue
406 else:
407 self.queue.append(task_id)
409 self.guard.notify_all()
411 def task_run_wrapper(self, wrapper, task_id, can_be_run):
412 try:
413 self.task_run_inner(wrapper, task_id, can_be_run)
414 except:
415 import traceback
416 traceback.print_exc()
418 def xml_escape_line(self, s):
419 from xml.sax.saxutils import escape
420 import re
422 s_without_ctrl = re.sub(r'[\x00-\x08\x0A-\x1F]', '', s)
423 s_escaped = escape(s_without_ctrl)
424 s_all_entities_encoded = s_escaped.encode('ascii', 'xmlcharrefreplace')
426 return s_all_entities_encoded.decode('utf8')
428 def task_run_inner(self, wrapper, task_id, can_be_run):
429 data = {}
431 if can_be_run:
432 for task_dep_id in wrapper.dependencies:
433 task_dep = self.tasks[task_dep_id]
434 data[task_dep_id] = task_dep.get_data()
436 wrapper.task.ctl = self.ctl.derive(task_id, data)
437 wrapper.task.ctl.set_log_file('%s.log' % task_id)
439 if can_be_run:
440 self.announce_task_started_(wrapper)
442 try:
443 res = wrapper.task.execute()
444 if (res == True) or (res is None):
445 res = {
446 'status': 'ok',
447 'data': {}
449 elif res == False:
450 res = {
451 'status': 'fail',
452 'data': {}
454 reason = None
455 except Exception as e:
456 import traceback
457 res = {
458 'status': 'fail',
459 'data': {}
461 #traceback.print_exc()
462 reason = '%s' % e
463 else:
464 for task_dep_id in wrapper.dependencies:
465 task_dep = self.tasks[task_dep_id]
466 if task_dep.has_finished() and (not task_dep.has_completed_okay()):
467 reason = 'dependency %s failed (or also skipped).' % task_dep_id
468 res = {
469 'status': 'skip',
470 'data': {}
472 wrapper.task.ctl.append_line_to_log_file('Skipped: %s' % reason)
474 status = res['status']
475 report = wrapper.task.get_report()
477 if (not report['name'] is None) and (not self.report_file is None):
478 report_xml = '<' + report['name']
479 report['attrs']['result'] = status
480 for key in report['attrs']:
481 report_xml = report_xml + ' %s="%s"' % (key, report['attrs'][key] )
482 report_xml = report_xml + ' log="logs/%s.log"' % wrapper.id
483 report_xml = report_xml + ">\n"
485 if 'files' in report:
486 for f in report['files']:
487 file = '<file title="%s" filename="%s" />\n' % ( f['title'], f['filename'])
488 report_xml = report_xml + file
490 if (not wrapper.task.ctl is None) and (len(wrapper.task.ctl.log_tail) > 0):
491 report_xml = report_xml + ' <log>\n'
492 for line in wrapper.task.ctl.log_tail:
493 report_xml = report_xml + ' <logline>' + self.xml_escape_line(line) + '</logline>\n'
494 report_xml = report_xml + ' </log>\n'
496 report_xml = report_xml + '</' + report['name'] + ">\n"
499 self.report_file.write(report_xml)
501 wrapper.set_status(status, reason)
502 self.announce_task_finished_(wrapper)
503 wrapper.set_done(res['data'])
505 with self.guard:
506 self.running_tasks_count = self.running_tasks_count - 1
508 if can_be_run:
509 for m in wrapper.mutexes:
510 self.task_mutexes [ m ] = False
512 #print("Task finished, waking up (running now {})".format(self.running_tasks_count))
513 self.guard.notify_all()
516 def process_queue(self):
517 while True:
518 with self.guard:
519 #print("Process queue running, tasks {}".format(len(self.queue)))
520 # Break inside the loop
521 while True:
522 slot_available = self.running_tasks_count < self.max_workers
523 task_available = len(self.queue) > 0
524 #print("Queue: {} (running {})".format(len(self.queue), self.running_tasks_count))
525 if slot_available and task_available:
526 break
527 if self.terminate and (not task_available):
528 return
530 #print("Queue waiting for free slots (running {}) or tasks (have {})".format(self.running_tasks_count, len(self.queue)))
531 self.guard.wait()
532 #print("Guard woken-up after waiting for free slots.")
534 # We have some tasks in the queue and we can run at
535 # least one of them
536 ( ready_task_id, can_be_run ) = self.get_first_ready_task_id_()
537 #print("Ready task is {}".format(ready_task_id))
539 if ready_task_id is None:
540 #print("Queue waiting for new tasks to appear (have {})".format(len(self.queue)))
541 self.guard.wait()
542 #print("Guard woken-up after no ready task.")
543 else:
544 # Remove the task from the queue
545 self.queue.remove(ready_task_id)
547 ready_task = self.tasks[ready_task_id]
549 # Need to update number of running tasks here and now
550 # because the executor might start the execution later
551 # and we would evaluate incorrectly the condition above
552 # that we can start another task.
553 self.running_tasks_count = self.running_tasks_count + 1
555 #print("Ready is {}".format(ready_task))
556 if can_be_run:
557 for m in ready_task.mutexes:
558 self.task_mutexes [ m ] = True
559 #print("Actually starting task {}".format(ready_task_id))
560 self.executor.submit(BuildScheduler.task_run_wrapper,
561 self, ready_task, ready_task_id, can_be_run)
563 def get_first_ready_task_id_(self):
565 Return tuple of first task that can be run (or failed immediately)
566 with note whether the result is predetermined.
567 Returns None when no task can be run.
569 # We assume self.guard was already acquired
570 # We use here the for ... else construct of Python (recall that else
571 # is taken when break is not used)
572 for task_id in self.queue:
573 task = self.tasks[task_id]
574 for task_dep_id in task.dependencies:
575 task_dep = self.tasks[ task_dep_id ]
576 if not task_dep.has_finished():
577 break
578 # Failed dependency means we can return now
579 if task_dep.get_status() != 'ok':
580 return ( task_id, False )
581 else:
582 for task_mutex in task.mutexes:
583 if (task_mutex in self.task_mutexes) and self.task_mutexes[ task_mutex ]:
584 break
585 else:
586 return ( task_id, True )
587 return ( None, None )
589 def announce_task_started_(self, task):
590 self.printer.print_starting(task.description + " ...")
592 def announce_task_finished_(self, task):
593 description = task.description
594 if task.status == 'ok':
595 msg = 'done'
596 msg_color = self.printer.GREEN
597 description = description + '.'
598 elif task.status == 'skip':
599 msg = 'skip'
600 msg_color = self.printer.CYAN
601 description = description + ': ' + task.reason
602 else:
603 msg = 'fail'
604 msg_color = self.printer.RED
605 description = description + ': ' + task.reason
607 self.printer.print_finished(msg_color, msg, description)
610 def barrier(self):
611 with self.guard:
612 #print("Barrier ({}, {})...".format(self.running_tasks_count, len(self.queue)))
613 while (self.running_tasks_count > 0) or (len(self.queue) > 0):
614 #print("Barrier waiting ({}, {})...".format(self.running_tasks_count, len(self.queue)))
615 self.guard.wait()
617 def done(self):
618 with self.guard:
619 self.terminate = True
620 self.guard.notify_all()
621 self.barrier()
622 self.close_report()
623 self.executor.shutdown(True)
625 def close_report(self):
626 if not self.report_file is None:
627 end_time = time.time()
628 self.report_file.write("<buildinfo started=\"{}\" duration=\"{}\" parallelism=\"{}\" />\n".format(
629 self.start_date, ( end_time - self.start_timestamp ) * 1000,
630 self.max_workers
632 self.report_file.write("</build>\n")
633 self.report_file.close()
634 self.report_file = None