Fixes in Coastline building process
[ci.git] / hbuild / scheduler.py
blobc7bf82df3ca17d0534ebb4ef214d137f488a1c95
1 #!/usr/bin/env python3
4 # Copyright (c) 2017 Vojtech Horky
5 # All rights reserved.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
11 # - Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # - Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
16 # - The name of the author may not be used to endorse or promote products
17 # derived from this software without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 import subprocess
32 import os
33 import time
34 import datetime
35 import shutil
36 import sys
38 from threading import Lock, Condition
40 class Task:
41 def __init__(self, report_tag, **report_args):
42 self.report = {
43 'name': report_tag,
44 'result': 'unknown',
45 'attrs': {},
47 for k in report_args:
48 self.report['attrs'][k] = report_args[k]
49 self.ctl = None
51 def execute(self):
52 start_time = time.time()
53 try:
54 res = self.run()
55 if res == False:
56 raise Exception('run() returned False')
57 self.report['result'] = 'ok'
59 self.report['files'] = self.ctl.get_files()
61 end_time = time.time()
62 self.report['attrs']['duration'] = (end_time - start_time) * 1000
64 if res is None:
65 res = {}
67 self.ctl.done()
69 return {
70 'status': 'ok',
71 'data': res
73 except Exception as e:
74 end_time = time.time()
75 self.report['attrs']['duration'] = (end_time - start_time) * 1000
77 self.report['result'] = 'fail'
78 self.report['files'] = []
79 self.ctl.done()
80 raise e
82 def run(self):
83 pass
85 def get_report(self):
86 return self.report
89 class TaskException(Exception):
90 def __init__(self, msg):
91 Exception.__init__(self, msg)
93 class RunCommandException(TaskException):
94 def __init__(self, msg, rc, output):
95 self.rc = rc
96 self.output = output
97 Exception.__init__(self, msg)
99 class TaskController:
100 def __init__(self, name, data, build_directory, artefact_directory, printer, kept_log_lines, print_debug = False):
101 self.name = name
102 self.data = data
103 self.files = []
104 self.log = None
105 self.log_tail = []
106 self.build_directory = build_directory
107 self.artefact_directory = artefact_directory
108 self.printer = printer
109 self.kept_log_lines = kept_log_lines
110 self.print_debug_messages = print_debug
112 def derive(self, name, data):
113 return TaskController(name, data, self.build_directory, self.artefact_directory,
114 self.printer, self.kept_log_lines, self.print_debug_messages)
116 def dprint(self, str, *args):
117 if self.print_debug_messages:
118 self.printer.print_debug(self.name, str % args)
121 def get_dependency_data(self, dep, key=None):
122 if key is None:
123 return self.get_data(dep)
124 return self.data[dep][key]
126 def get_data(self, key):
127 for dep in self.data:
128 if key in self.data[dep]:
129 return self.data[dep][key]
130 raise TaskException("WARN: unknown key %s" % key)
132 def run_command(self, cmd, cwd=None, needs_output=False):
133 self.dprint("Running `%s'..." % ' '.join(cmd))
134 output = []
135 last_line = ""
136 rc = 0
138 # FIXME: can we keep stdout and stderr separated?
139 with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=cwd) as proc:
140 for line in proc.stdout:
141 line = line.decode('utf-8').strip('\n')
142 self.append_line_to_log_file(line)
143 if needs_output:
144 output.append(line)
145 last_line = line
146 proc.wait()
147 rc = proc.returncode
148 if rc != 0:
149 #self.dprint('stderr=%s' % stderr.strip('\n'))
151 raise RunCommandException(
152 "`%s' failed: %s" % (' '.join(cmd), last_line),
153 rc, output)
155 return {
156 'output': output,
157 'stdout': '\n'.join(output),
158 'stderr': '\n'.join(output),
159 'rc': rc,
160 'failed': not rc == 0
163 def make_temp_dir(self, name):
164 dname = '%s/%s' % ( self.build_directory, name )
165 os.makedirs(dname, exist_ok=True)
166 return os.path.abspath(dname)
168 def recursive_copy(self, src_dir, dest_dir):
169 os.makedirs(dest_dir, exist_ok=True)
170 self.run_command([ 'rsync', '-a', src_dir + '/', dest_dir ])
172 def set_log_file(self, log_filename):
173 # TODO: open the log lazily
174 # TODO: propagate the information to XML report
175 self.log = self.open_downloadable_file('logs/' + log_filename, 'w')
177 def append_line_to_log_file(self, line):
178 if not self.log is None:
179 self.log.write(line + '\n')
180 self.log_tail.append(line)
181 self.log_tail = self.log_tail[-self.kept_log_lines:]
183 def get_artefact_absolute_path(self, relative_name, create_dirs=False):
184 base = os.path.dirname(relative_name)
185 name = os.path.basename(relative_name)
186 dname = '%s/%s/' % ( self.artefact_directory, base )
188 if create_dirs:
189 os.makedirs(dname, exist_ok=True)
191 return os.path.abspath(dname + name)
193 # TODO: propagate title + download_name to the report
194 def add_downloadable_file(self, title, download_name, current_filename):
195 self.dprint("Downloadable `%s' at %s", title, download_name)
196 self.files.append({
197 'filename' : download_name,
198 'title' : title
201 target = self.get_artefact_absolute_path(download_name, True)
202 shutil.copy(current_filename, target)
204 return target
206 def move_dir_to_downloadable(self, title, download_name, current_dirname):
207 self.dprint("Downloadable `%s' at %s", title, download_name)
208 self.files.append({
209 'filename' : download_name,
210 'title' : title
213 target = self.get_artefact_absolute_path(download_name, True)
214 shutil.move(current_dirname, target)
216 return target
218 def open_downloadable_file(self, download_name, mode):
219 return open(self.get_artefact_absolute_path(download_name, True), mode)
221 def done(self):
222 if not self.log is None:
223 self.log.close()
225 def get_files(self):
226 return self.files
228 def ret(self, *args, **kwargs):
229 status = 'ok'
230 if len(args) == 1:
231 status = args[0]
232 if status == True:
233 status = 'ok'
234 elif status == False:
235 status = 'fail'
236 result = {
237 'status': status,
238 'data': {}
241 for k in kwargs:
242 result['data'][k] = kwargs[k]
244 return result
248 class TaskWrapper:
249 def __init__(self, id, task, description, deps, mutexes):
250 self.id = id
251 self.dependencies = deps
252 self.description = description
253 self.task = task
254 self.status = 'n/a'
255 self.completed = False
256 self.data = {}
257 self.lock = Lock()
258 self.mutexes = mutexes
260 def has_completed_okay(self):
261 with self.lock:
262 return self.completed and (self.status == 'ok')
264 def has_finished(self):
265 with self.lock:
266 return self.completed
268 def get_status(self):
269 with self.lock:
270 return self.status
272 def get_data(self):
273 with self.lock:
274 return self.data
276 def set_status(self, status, reason):
277 with self.lock:
278 self.status = status
279 self.reason = reason
281 def set_done(self, data):
282 with self.lock:
283 self.data = data
284 self.completed = True
286 def set_skipped(self, reason):
287 self.set_status('skip', reason)
290 class BuildScheduler:
291 def __init__(self, max_workers, build, artefact, build_id, printer, inline_log_lines = 10, debug = False):
292 self.config = {
293 'build-directory': build,
294 'artefact-directory': artefact,
297 self.printer = printer
299 self.start_timestamp = time.time()
300 self.start_date = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).astimezone().isoformat(' ')
302 # Parent task controller
303 self.ctl = TaskController('scheduler', {}, build, artefact, self.printer, inline_log_lines, debug)
305 # Start the log file
306 self.report_file = self.ctl.open_downloadable_file('report.xml', 'w')
307 self.report_file.write("<?xml version=\"1.0\"?>\n")
308 self.report_file.write("<build number=\"%s\">\n" % build_id)
310 # The following attributes (up to self.guard declaration) are guarded
311 # by the self.guard mutex and use self.cond to notify about changes
312 # in any of them.
313 # Lower granularity of the locking would be possible but would
314 # complicate too much the conditions inside queue processing where we
315 # need to react to multiple events (new task added vs some task
316 # terminated vs selecting the right task to be executed).
318 # Known tasks (regardless of their state). The mapping is between
319 # a task id and TaskWrapper class.
320 self.tasks = {}
322 # Queue of tasks not yet run (uses task ids only). We insert mutex
323 # tasks at queue beginning (instead of appending) as a heuristic to
324 # prevent accumulation of mutex tasks at the end of run where it could
325 # hurt concurrent execution.
326 self.queue = []
328 # Number of currently running (executing) tasks. Used solely to
329 # control number of concurrently running tasks.
330 self.running_tasks_count = 0
332 # Flag for the queue processing whether to terminate the loop to allow
333 # clean termination of the executor.
334 self.terminate = False
336 # Here we record which mutexes are held by executing tasks. Mutexes are
337 # identified by their (string) name that is used as index. When the
338 # value is True, the mutex is held (i.e. do not run any other task
339 # claming the same mutex), mutex is not held when the value is False
340 # or when the key is not present at all.
341 self.task_mutexes = {}
343 # Condition variable guarding the above attributes.
344 # We initialize CV only without attaching a lock as it creates one
345 # automatically and CV serves as a lock too.
347 # Always use notify_all as we are waiting in multiple functions
348 # for it (e.g. while processing the queue or in barrier).
349 self.guard = Condition()
351 # Lock guarding output synchronization
352 self.output_lock = Lock()
354 # Executor for running of individual tasks
355 from concurrent.futures import ThreadPoolExecutor
356 self.max_workers = max_workers
357 self.executor = ThreadPoolExecutor(max_workers=max_workers + 2)
359 # Start the queue processor
360 self.executor.submit(BuildScheduler.process_queue_wrapper, self)
362 def process_queue_wrapper(self):
364 To allow debugging of the queue processor.
366 try:
367 self.process_queue()
368 except:
369 import traceback
370 traceback.print_exc()
372 def submit(self, description, task_id, task, deps = [], mutexes = []):
373 with self.guard:
374 #print("Submitting {} ({}, {}, {})".format(description, task_id, deps, mutexes))
375 # Check that dependencies are known
376 for d in deps:
377 if not d in self.tasks:
378 raise Exception('Dependency %s is not known.' % d)
379 # Add the wrapper
380 wrapper = TaskWrapper(task_id, task, description, deps, mutexes)
381 self.tasks[task_id] = wrapper
383 # Append to the queue
384 # We use a simple heuristic: if the task has no mutexes, we
385 # append to the end of the queue. Otherwise we prioritize the
386 # task a little bit to prevent ending with serialized execution
387 # of the mutually excluded tasks. (We add before first non-mutexed
388 # task.)
389 if len(mutexes) > 0:
390 new_queue = []
391 inserted = False
392 for q in self.queue:
393 if (len(self.tasks[q].mutexes) == 0) and (not inserted):
394 new_queue.append(task_id)
395 inserted = True
396 new_queue.append(q)
397 if not inserted:
398 new_queue.append(task_id)
399 self.queue = new_queue
400 else:
401 self.queue.append(task_id)
403 self.guard.notify_all()
405 def task_run_wrapper(self, wrapper, task_id, can_be_run):
406 try:
407 self.task_run_inner(wrapper, task_id, can_be_run)
408 except:
409 import traceback
410 traceback.print_exc()
412 def xml_escape_line(self, s):
413 from xml.sax.saxutils import escape
414 import re
416 s_without_ctrl = re.sub(r'[\x00-\x08\x0A-\x1F]', '', s)
417 s_escaped = escape(s_without_ctrl)
418 s_all_entities_encoded = s_escaped.encode('ascii', 'xmlcharrefreplace')
420 return s_all_entities_encoded.decode('utf8')
422 def task_run_inner(self, wrapper, task_id, can_be_run):
423 data = {}
425 if can_be_run:
426 for task_dep_id in wrapper.dependencies:
427 task_dep = self.tasks[task_dep_id]
428 data[task_dep_id] = task_dep.get_data()
430 wrapper.task.ctl = self.ctl.derive(task_id, data)
431 wrapper.task.ctl.set_log_file('%s.log' % task_id)
433 if can_be_run:
434 self.announce_task_started_(wrapper)
436 try:
437 res = wrapper.task.execute()
438 if (res == True) or (res is None):
439 res = {
440 'status': 'ok',
441 'data': {}
443 elif res == False:
444 res = {
445 'status': 'fail',
446 'data': {}
448 reason = None
449 except Exception as e:
450 import traceback
451 res = {
452 'status': 'fail',
453 'data': {}
455 #traceback.print_exc()
456 reason = '%s' % e
457 else:
458 for task_dep_id in wrapper.dependencies:
459 task_dep = self.tasks[task_dep_id]
460 if task_dep.has_finished() and (not task_dep.has_completed_okay()):
461 reason = 'dependency %s failed (or also skipped).' % task_dep_id
462 res = {
463 'status': 'skip',
464 'data': {}
466 wrapper.task.ctl.append_line_to_log_file('Skipped: %s' % reason)
468 status = res['status']
469 report = wrapper.task.get_report()
471 if (not report['name'] is None) and (not self.report_file is None):
472 report_xml = '<' + report['name']
473 report['attrs']['result'] = status
474 for key in report['attrs']:
475 report_xml = report_xml + ' %s="%s"' % (key, report['attrs'][key] )
476 report_xml = report_xml + ' log="logs/%s.log"' % wrapper.id
477 report_xml = report_xml + ">\n"
479 if 'files' in report:
480 for f in report['files']:
481 file = '<file title="%s" filename="%s" />\n' % ( f['title'], f['filename'])
482 report_xml = report_xml + file
484 if (not wrapper.task.ctl is None) and (len(wrapper.task.ctl.log_tail) > 0):
485 report_xml = report_xml + ' <log>\n'
486 for line in wrapper.task.ctl.log_tail:
487 report_xml = report_xml + ' <logline>' + self.xml_escape_line(line) + '</logline>\n'
488 report_xml = report_xml + ' </log>\n'
490 report_xml = report_xml + '</' + report['name'] + ">\n"
493 self.report_file.write(report_xml)
495 wrapper.set_status(status, reason)
496 self.announce_task_finished_(wrapper)
497 wrapper.set_done(res['data'])
499 with self.guard:
500 self.running_tasks_count = self.running_tasks_count - 1
502 if can_be_run:
503 for m in wrapper.mutexes:
504 self.task_mutexes [ m ] = False
506 #print("Task finished, waking up (running now {})".format(self.running_tasks_count))
507 self.guard.notify_all()
510 def process_queue(self):
511 while True:
512 with self.guard:
513 #print("Process queue running, tasks {}".format(len(self.queue)))
514 # Break inside the loop
515 while True:
516 slot_available = self.running_tasks_count < self.max_workers
517 task_available = len(self.queue) > 0
518 #print("Queue: {} (running {})".format(len(self.queue), self.running_tasks_count))
519 if slot_available and task_available:
520 break
521 if self.terminate and (not task_available):
522 return
524 #print("Queue waiting for free slots (running {}) or tasks (have {})".format(self.running_tasks_count, len(self.queue)))
525 self.guard.wait()
526 #print("Guard woken-up after waiting for free slots.")
528 # We have some tasks in the queue and we can run at
529 # least one of them
530 ( ready_task_id, can_be_run ) = self.get_first_ready_task_id_()
531 #print("Ready task is {}".format(ready_task_id))
533 if ready_task_id is None:
534 #print("Queue waiting for new tasks to appear (have {})".format(len(self.queue)))
535 self.guard.wait()
536 #print("Guard woken-up after no ready task.")
537 else:
538 # Remove the task from the queue
539 self.queue.remove(ready_task_id)
541 ready_task = self.tasks[ready_task_id]
543 # Need to update number of running tasks here and now
544 # because the executor might start the execution later
545 # and we would evaluate incorrectly the condition above
546 # that we can start another task.
547 self.running_tasks_count = self.running_tasks_count + 1
549 #print("Ready is {}".format(ready_task))
550 if can_be_run:
551 for m in ready_task.mutexes:
552 self.task_mutexes [ m ] = True
553 #print("Actually starting task {}".format(ready_task_id))
554 self.executor.submit(BuildScheduler.task_run_wrapper,
555 self, ready_task, ready_task_id, can_be_run)
557 def get_first_ready_task_id_(self):
559 Return tuple of first task that can be run (or failed immediately)
560 with note whether the result is predetermined.
561 Returns None when no task can be run.
563 # We assume self.guard was already acquired
564 # We use here the for ... else construct of Python (recall that else
565 # is taken when break is not used)
566 for task_id in self.queue:
567 task = self.tasks[task_id]
568 for task_dep_id in task.dependencies:
569 task_dep = self.tasks[ task_dep_id ]
570 if not task_dep.has_finished():
571 break
572 # Failed dependency means we can return now
573 if task_dep.get_status() != 'ok':
574 return ( task_id, False )
575 else:
576 for task_mutex in task.mutexes:
577 if (task_mutex in self.task_mutexes) and self.task_mutexes[ task_mutex ]:
578 break
579 else:
580 return ( task_id, True )
581 return ( None, None )
583 def announce_task_started_(self, task):
584 self.printer.print_starting(task.description + " ...")
586 def announce_task_finished_(self, task):
587 description = task.description
588 if task.status == 'ok':
589 msg = 'done'
590 msg_color = self.printer.GREEN
591 description = description + '.'
592 elif task.status == 'skip':
593 msg = 'skip'
594 msg_color = self.printer.CYAN
595 description = description + ': ' + task.reason
596 else:
597 msg = 'fail'
598 msg_color = self.printer.RED
599 description = description + ': ' + task.reason
601 self.printer.print_finished(msg_color, msg, description)
604 def barrier(self):
605 with self.guard:
606 #print("Barrier ({}, {})...".format(self.running_tasks_count, len(self.queue)))
607 while (self.running_tasks_count > 0) or (len(self.queue) > 0):
608 #print("Barrier waiting ({}, {})...".format(self.running_tasks_count, len(self.queue)))
609 self.guard.wait()
611 def done(self):
612 with self.guard:
613 self.terminate = True
614 self.guard.notify_all()
615 self.barrier()
616 self.close_report()
617 self.executor.shutdown(True)
619 def close_report(self):
620 if not self.report_file is None:
621 end_time = time.time()
622 self.report_file.write("<buildinfo started=\"{}\" duration=\"{}\" parallelism=\"{}\" />\n".format(
623 self.start_date, ( end_time - self.start_timestamp ) * 1000,
624 self.max_workers
626 self.report_file.write("</build>\n")
627 self.report_file.close()
628 self.report_file = None