4 # Copyright (c) 2017 Vojtech Horky
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
11 # - Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # - Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
16 # - The name of the author may not be used to endorse or promote products
17 # derived from this software without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 from threading
import Lock
, Condition
41 def __init__(self
, report_tag
, **report_args
):
48 self
.report
['attrs'][k
] = report_args
[k
]
52 start_time
= time
.time()
56 raise Exception('run() returned False')
57 self
.report
['result'] = 'ok'
59 self
.report
['files'] = self
.ctl
.get_files()
61 end_time
= time
.time()
62 self
.report
['attrs']['duration'] = (end_time
- start_time
) * 1000
73 except Exception as e
:
74 end_time
= time
.time()
75 self
.report
['attrs']['duration'] = (end_time
- start_time
) * 1000
77 self
.report
['result'] = 'fail'
78 self
.report
['files'] = self
.ctl
.get_files()
89 class TaskException(Exception):
90 def __init__(self
, msg
):
91 Exception.__init
__(self
, msg
)
93 class RunCommandException(TaskException
):
94 def __init__(self
, msg
, rc
, output
):
97 Exception.__init
__(self
, msg
)
100 def __init__(self
, name
, data
, build_directory
, artefact_directory
, printer
, kept_log_lines
, print_debug
= False):
106 self
.build_directory
= build_directory
107 self
.artefact_directory
= artefact_directory
108 self
.printer
= printer
109 self
.kept_log_lines
= kept_log_lines
110 self
.print_debug_messages
= print_debug
112 def derive(self
, name
, data
):
113 return TaskController(name
, data
, self
.build_directory
, self
.artefact_directory
,
114 self
.printer
, self
.kept_log_lines
, self
.print_debug_messages
)
116 def dprint(self
, str, *args
):
117 if self
.print_debug_messages
:
118 self
.printer
.print_debug(self
.name
, str % args
)
121 def get_dependency_data(self
, dep
, key
=None):
123 return self
.get_data(dep
)
124 return self
.data
[dep
][key
]
126 def get_data(self
, key
):
127 for dep
in self
.data
:
128 if key
in self
.data
[dep
]:
129 return self
.data
[dep
][key
]
130 raise TaskException("WARN: unknown key %s" % key
)
132 def run_command(self
, cmd
, cwd
=None, needs_output
=False):
133 self
.dprint("Running `%s'..." % ' '.join(cmd
))
138 # FIXME: can we keep stdout and stderr separated?
139 with subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
, cwd
=cwd
) as proc
:
140 for line
in proc
.stdout
:
141 line
= line
.decode('utf-8').strip('\n')
142 self
.append_line_to_log_file(line
)
149 #self.dprint('stderr=%s' % stderr.strip('\n'))
151 raise RunCommandException(
152 "`%s' failed: %s" % (' '.join(cmd
), last_line
),
157 'stdout': '\n'.join(output
),
158 'stderr': '\n'.join(output
),
160 'failed': not rc
== 0
163 def make_temp_dir(self
, name
):
164 dname
= '%s/%s' % ( self
.build_directory
, name
)
165 os
.makedirs(dname
, exist_ok
=True)
166 return os
.path
.abspath(dname
)
168 def recursive_copy(self
, src_dir
, dest_dir
):
169 os
.makedirs(dest_dir
, exist_ok
=True)
170 self
.run_command([ 'rsync', '-a', src_dir
+ '/', dest_dir
])
172 def remove_silently(self
, path
):
175 except OSError as ex
:
178 def remove_silently_recursive(self
, path
):
180 shutil
.rmtree(path
, True)
181 except OSError as ex
:
184 def set_log_file(self
, log_filename
):
185 # TODO: open the log lazily
186 # TODO: propagate the information to XML report
187 self
.log
= self
.open_downloadable_file('logs/' + log_filename
, 'w')
189 def append_line_to_log_file(self
, line
):
190 if not self
.log
is None:
191 self
.log
.write(line
+ '\n')
192 self
.log_tail
.append(line
)
193 self
.log_tail
= self
.log_tail
[-self
.kept_log_lines
:]
195 def get_artefact_absolute_path(self
, relative_name
, create_dirs
=False):
196 base
= os
.path
.dirname(relative_name
)
197 name
= os
.path
.basename(relative_name
)
198 dname
= '%s/%s/' % ( self
.artefact_directory
, base
)
201 os
.makedirs(dname
, exist_ok
=True)
203 return os
.path
.abspath(dname
+ name
)
205 def add_downloadable_file(self
, title
, download_name
, current_filename
):
206 self
.dprint("Downloadable `%s' at %s", title
, download_name
)
208 target
= self
.get_artefact_absolute_path(download_name
, True)
209 shutil
.copy(current_filename
, target
)
212 'filename' : download_name
,
218 def move_dir_to_downloadable(self
, title
, download_name
, current_dirname
):
219 self
.dprint("Downloadable `%s' at %s", title
, download_name
)
221 target
= self
.get_artefact_absolute_path(download_name
, True)
222 shutil
.move(current_dirname
, target
)
225 'filename' : download_name
,
231 def open_downloadable_file(self
, download_name
, mode
):
232 return open(self
.get_artefact_absolute_path(download_name
, True), mode
)
235 if not self
.log
is None:
241 def ret(self
, *args
, **kwargs
):
247 elif status
== False:
255 result
['data'][k
] = kwargs
[k
]
262 def __init__(self
, id, task
, description
, deps
, mutexes
):
264 self
.dependencies
= deps
265 self
.description
= description
268 self
.completed
= False
271 self
.mutexes
= mutexes
273 def has_completed_okay(self
):
275 return self
.completed
and (self
.status
== 'ok')
277 def has_finished(self
):
279 return self
.completed
281 def get_status(self
):
289 def set_status(self
, status
, reason
):
294 def set_done(self
, data
):
297 self
.completed
= True
299 def set_skipped(self
, reason
):
300 self
.set_status('skip', reason
)
303 class BuildScheduler
:
304 def __init__(self
, max_workers
, build
, artefact
, build_id
, printer
, inline_log_lines
= 10, debug
= False):
306 'build-directory': build
,
307 'artefact-directory': artefact
,
310 self
.printer
= printer
312 self
.start_timestamp
= time
.time()
313 self
.start_date
= datetime
.datetime
.now(datetime
.timezone
.utc
).replace(microsecond
=0).astimezone().isoformat(' ')
315 # Parent task controller
316 self
.ctl
= TaskController('scheduler', {}, build
, artefact
, self
.printer
, inline_log_lines
, debug
)
319 self
.report_file
= self
.ctl
.open_downloadable_file('report.xml', 'w')
320 self
.report_file
.write("<?xml version=\"1.0\"?>\n")
321 self
.report_file
.write("<build number=\"%s\">\n" % build_id
)
323 # The following attributes (up to self.guard declaration) are guarded
324 # by the self.guard mutex and use self.cond to notify about changes
326 # Lower granularity of the locking would be possible but would
327 # complicate too much the conditions inside queue processing where we
328 # need to react to multiple events (new task added vs some task
329 # terminated vs selecting the right task to be executed).
331 # Known tasks (regardless of their state). The mapping is between
332 # a task id and TaskWrapper class.
335 # Queue of tasks not yet run (uses task ids only). We insert mutex
336 # tasks at queue beginning (instead of appending) as a heuristic to
337 # prevent accumulation of mutex tasks at the end of run where it could
338 # hurt concurrent execution.
341 # Number of currently running (executing) tasks. Used solely to
342 # control number of concurrently running tasks.
343 self
.running_tasks_count
= 0
345 # Flag for the queue processing whether to terminate the loop to allow
346 # clean termination of the executor.
347 self
.terminate
= False
349 # Here we record which mutexes are held by executing tasks. Mutexes are
350 # identified by their (string) name that is used as index. When the
351 # value is True, the mutex is held (i.e. do not run any other task
352 # claming the same mutex), mutex is not held when the value is False
353 # or when the key is not present at all.
354 self
.task_mutexes
= {}
356 # Condition variable guarding the above attributes.
357 # We initialize CV only without attaching a lock as it creates one
358 # automatically and CV serves as a lock too.
360 # Always use notify_all as we are waiting in multiple functions
361 # for it (e.g. while processing the queue or in barrier).
362 self
.guard
= Condition()
364 # Lock guarding output synchronization
365 self
.output_lock
= Lock()
367 # Executor for running of individual tasks
368 from concurrent
.futures
import ThreadPoolExecutor
369 self
.max_workers
= max_workers
370 self
.executor
= ThreadPoolExecutor(max_workers
=max_workers
+ 2)
372 # Start the queue processor
373 self
.executor
.submit(BuildScheduler
.process_queue_wrapper
, self
)
375 def process_queue_wrapper(self
):
377 To allow debugging of the queue processor.
383 traceback
.print_exc()
385 def submit(self
, description
, task_id
, task
, deps
= [], mutexes
= []):
387 #print("Submitting {} ({}, {}, {})".format(description, task_id, deps, mutexes))
388 # Check that dependencies are known
390 if not d
in self
.tasks
:
391 raise Exception('Dependency %s is not known.' % d
)
393 wrapper
= TaskWrapper(task_id
, task
, description
, deps
, mutexes
)
394 self
.tasks
[task_id
] = wrapper
396 # Append to the queue
397 # We use a simple heuristic: if the task has no mutexes, we
398 # append to the end of the queue. Otherwise we prioritize the
399 # task a little bit to prevent ending with serialized execution
400 # of the mutually excluded tasks. (We add before first non-mutexed
406 if (len(self
.tasks
[q
].mutexes
) == 0) and (not inserted
):
407 new_queue
.append(task_id
)
411 new_queue
.append(task_id
)
412 self
.queue
= new_queue
414 self
.queue
.append(task_id
)
416 self
.guard
.notify_all()
418 def task_run_wrapper(self
, wrapper
, task_id
, can_be_run
):
420 self
.task_run_inner(wrapper
, task_id
, can_be_run
)
423 traceback
.print_exc()
425 def xml_escape_line(self
, s
):
426 from xml
.sax
.saxutils
import escape
429 s_without_ctrl
= re
.sub(r
'[\x00-\x08\x0A-\x1F]', '', s
)
430 s_escaped
= escape(s_without_ctrl
)
431 s_all_entities_encoded
= s_escaped
.encode('ascii', 'xmlcharrefreplace')
433 return s_all_entities_encoded
.decode('utf8')
435 def task_run_inner(self
, wrapper
, task_id
, can_be_run
):
439 for task_dep_id
in wrapper
.dependencies
:
440 task_dep
= self
.tasks
[task_dep_id
]
441 data
[task_dep_id
] = task_dep
.get_data()
443 wrapper
.task
.ctl
= self
.ctl
.derive(task_id
, data
)
444 wrapper
.task
.ctl
.set_log_file('%s.log' % task_id
)
447 self
.announce_task_started_(wrapper
)
450 res
= wrapper
.task
.execute()
451 if (res
== True) or (res
is None):
462 except Exception as e
:
468 #traceback.print_exc()
471 for task_dep_id
in wrapper
.dependencies
:
472 task_dep
= self
.tasks
[task_dep_id
]
473 if task_dep
.has_finished() and (not task_dep
.has_completed_okay()):
474 reason
= 'dependency %s failed (or also skipped).' % task_dep_id
479 wrapper
.task
.ctl
.append_line_to_log_file('Skipped: %s' % reason
)
481 status
= res
['status']
482 report
= wrapper
.task
.get_report()
484 if (not report
['name'] is None) and (not self
.report_file
is None):
485 report_xml
= '<' + report
['name']
486 report
['attrs']['result'] = status
487 for key
in report
['attrs']:
488 report_xml
= report_xml
+ ' %s="%s"' % (key
, report
['attrs'][key
] )
489 report_xml
= report_xml
+ ' log="logs/%s.log"' % wrapper
.id
490 report_xml
= report_xml
+ ">\n"
492 if 'files' in report
:
493 for f
in report
['files']:
494 file = '<file title="%s" filename="%s" />\n' % ( f
['title'], f
['filename'])
495 report_xml
= report_xml
+ file
497 if (not wrapper
.task
.ctl
is None) and (len(wrapper
.task
.ctl
.log_tail
) > 0):
498 report_xml
= report_xml
+ ' <log>\n'
499 for line
in wrapper
.task
.ctl
.log_tail
:
500 report_xml
= report_xml
+ ' <logline>' + self
.xml_escape_line(line
) + '</logline>\n'
501 report_xml
= report_xml
+ ' </log>\n'
503 report_xml
= report_xml
+ '</' + report
['name'] + ">\n"
506 self
.report_file
.write(report_xml
)
508 wrapper
.set_status(status
, reason
)
509 self
.announce_task_finished_(wrapper
)
510 wrapper
.set_done(res
['data'])
513 self
.running_tasks_count
= self
.running_tasks_count
- 1
516 for m
in wrapper
.mutexes
:
517 self
.task_mutexes
[ m
] = False
519 #print("Task finished, waking up (running now {})".format(self.running_tasks_count))
520 self
.guard
.notify_all()
523 def process_queue(self
):
526 #print("Process queue running, tasks {}".format(len(self.queue)))
527 # Break inside the loop
529 slot_available
= self
.running_tasks_count
< self
.max_workers
530 task_available
= len(self
.queue
) > 0
531 #print("Queue: {} (running {})".format(len(self.queue), self.running_tasks_count))
532 if slot_available
and task_available
:
534 if self
.terminate
and (not task_available
):
537 #print("Queue waiting for free slots (running {}) or tasks (have {})".format(self.running_tasks_count, len(self.queue)))
539 #print("Guard woken-up after waiting for free slots.")
541 # We have some tasks in the queue and we can run at
543 ( ready_task_id
, can_be_run
) = self
.get_first_ready_task_id_()
544 #print("Ready task is {}".format(ready_task_id))
546 if ready_task_id
is None:
547 #print("Queue waiting for new tasks to appear (have {})".format(len(self.queue)))
549 #print("Guard woken-up after no ready task.")
551 # Remove the task from the queue
552 self
.queue
.remove(ready_task_id
)
554 ready_task
= self
.tasks
[ready_task_id
]
556 # Need to update number of running tasks here and now
557 # because the executor might start the execution later
558 # and we would evaluate incorrectly the condition above
559 # that we can start another task.
560 self
.running_tasks_count
= self
.running_tasks_count
+ 1
562 #print("Ready is {}".format(ready_task))
564 for m
in ready_task
.mutexes
:
565 self
.task_mutexes
[ m
] = True
566 #print("Actually starting task {}".format(ready_task_id))
567 self
.executor
.submit(BuildScheduler
.task_run_wrapper
,
568 self
, ready_task
, ready_task_id
, can_be_run
)
570 def get_first_ready_task_id_(self
):
572 Return tuple of first task that can be run (or failed immediately)
573 with note whether the result is predetermined.
574 Returns None when no task can be run.
576 # We assume self.guard was already acquired
577 # We use here the for ... else construct of Python (recall that else
578 # is taken when break is not used)
579 for task_id
in self
.queue
:
580 task
= self
.tasks
[task_id
]
581 for task_dep_id
in task
.dependencies
:
582 task_dep
= self
.tasks
[ task_dep_id
]
583 if not task_dep
.has_finished():
585 # Failed dependency means we can return now
586 if task_dep
.get_status() != 'ok':
587 return ( task_id
, False )
589 for task_mutex
in task
.mutexes
:
590 if (task_mutex
in self
.task_mutexes
) and self
.task_mutexes
[ task_mutex
]:
593 return ( task_id
, True )
594 return ( None, None )
596 def announce_task_started_(self
, task
):
597 self
.printer
.print_starting(task
.description
+ " ...")
599 def announce_task_finished_(self
, task
):
600 description
= task
.description
601 if task
.status
== 'ok':
603 msg_color
= self
.printer
.GREEN
604 description
= description
+ '.'
605 elif task
.status
== 'skip':
607 msg_color
= self
.printer
.CYAN
608 description
= description
+ ': ' + task
.reason
611 msg_color
= self
.printer
.RED
612 description
= description
+ ': ' + task
.reason
614 self
.printer
.print_finished(msg_color
, msg
, description
)
619 #print("Barrier ({}, {})...".format(self.running_tasks_count, len(self.queue)))
620 while (self
.running_tasks_count
> 0) or (len(self
.queue
) > 0):
621 #print("Barrier waiting ({}, {})...".format(self.running_tasks_count, len(self.queue)))
626 self
.terminate
= True
627 self
.guard
.notify_all()
630 self
.executor
.shutdown(True)
632 def close_report(self
):
633 if not self
.report_file
is None:
634 end_time
= time
.time()
635 self
.report_file
.write("<buildinfo started=\"{}\" duration=\"{}\" parallelism=\"{}\" />\n".format(
636 self
.start_date
, ( end_time
- self
.start_timestamp
) * 1000,
639 self
.report_file
.write("</build>\n")
640 self
.report_file
.close()
641 self
.report_file
= None