4 # Copyright (c) 2017 Vojtech Horky
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
11 # - Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # - Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
16 # - The name of the author may not be used to endorse or promote products
17 # derived from this software without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 from threading
import Lock
, Condition
41 def __init__(self
, report_tag
, **report_args
):
48 self
.report
['attrs'][k
] = report_args
[k
]
52 start_time
= time
.time()
56 raise Exception('run() returned False')
57 self
.report
['result'] = 'ok'
59 self
.report
['files'] = self
.ctl
.get_files()
61 end_time
= time
.time()
62 self
.report
['attrs']['duration'] = (end_time
- start_time
) * 1000
73 except Exception as e
:
74 end_time
= time
.time()
75 self
.report
['attrs']['duration'] = (end_time
- start_time
) * 1000
77 self
.report
['result'] = 'fail'
78 self
.report
['files'] = self
.ctl
.get_files()
89 class TaskException(Exception):
90 def __init__(self
, msg
):
91 Exception.__init
__(self
, msg
)
93 class RunCommandException(TaskException
):
94 def __init__(self
, msg
, rc
, output
):
97 Exception.__init
__(self
, msg
)
100 def __init__(self
, name
, data
, build_directory
, artefact_directory
, printer
, kept_log_lines
, print_debug
= False):
106 self
.build_directory
= build_directory
107 self
.artefact_directory
= artefact_directory
108 self
.printer
= printer
109 self
.kept_log_lines
= kept_log_lines
110 self
.print_debug_messages
= print_debug
112 def derive(self
, name
, data
):
113 return TaskController(name
, data
, self
.build_directory
, self
.artefact_directory
,
114 self
.printer
, self
.kept_log_lines
, self
.print_debug_messages
)
116 def dprint(self
, str, *args
):
117 if self
.print_debug_messages
:
118 self
.printer
.print_debug(self
.name
, str % args
)
121 def get_dependency_data(self
, dep
, key
=None):
123 return self
.get_data(dep
)
124 return self
.data
[dep
][key
]
126 def get_data(self
, key
):
127 for dep
in self
.data
:
128 if key
in self
.data
[dep
]:
129 return self
.data
[dep
][key
]
130 raise TaskException("WARN: unknown key %s" % key
)
132 def run_command(self
, cmd
, cwd
=None, needs_output
=False):
133 self
.dprint("Running `%s'..." % ' '.join(cmd
))
138 # FIXME: can we keep stdout and stderr separated?
139 with subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
, cwd
=cwd
) as proc
:
140 for line
in proc
.stdout
:
141 line
= line
.decode('utf-8').strip('\n')
142 self
.append_line_to_log_file(line
)
149 #self.dprint('stderr=%s' % stderr.strip('\n'))
151 raise RunCommandException(
152 "`%s' failed: %s" % (' '.join(cmd
), last_line
),
157 'stdout': '\n'.join(output
),
158 'stderr': '\n'.join(output
),
160 'failed': not rc
== 0
163 def make_temp_dir(self
, name
):
164 dname
= '%s/%s' % ( self
.build_directory
, name
)
165 os
.makedirs(dname
, exist_ok
=True)
166 return os
.path
.abspath(dname
)
168 def recursive_copy(self
, src_dir
, dest_dir
):
169 os
.makedirs(dest_dir
, exist_ok
=True)
170 self
.run_command([ 'rsync', '-a', src_dir
+ '/', dest_dir
])
172 def remove_silently(self
, path
):
175 except OSError as ex
:
178 def set_log_file(self
, log_filename
):
179 # TODO: open the log lazily
180 # TODO: propagate the information to XML report
181 self
.log
= self
.open_downloadable_file('logs/' + log_filename
, 'w')
183 def append_line_to_log_file(self
, line
):
184 if not self
.log
is None:
185 self
.log
.write(line
+ '\n')
186 self
.log_tail
.append(line
)
187 self
.log_tail
= self
.log_tail
[-self
.kept_log_lines
:]
189 def get_artefact_absolute_path(self
, relative_name
, create_dirs
=False):
190 base
= os
.path
.dirname(relative_name
)
191 name
= os
.path
.basename(relative_name
)
192 dname
= '%s/%s/' % ( self
.artefact_directory
, base
)
195 os
.makedirs(dname
, exist_ok
=True)
197 return os
.path
.abspath(dname
+ name
)
199 # TODO: propagate title + download_name to the report
200 def add_downloadable_file(self
, title
, download_name
, current_filename
):
201 self
.dprint("Downloadable `%s' at %s", title
, download_name
)
203 'filename' : download_name
,
207 target
= self
.get_artefact_absolute_path(download_name
, True)
208 shutil
.copy(current_filename
, target
)
212 def move_dir_to_downloadable(self
, title
, download_name
, current_dirname
):
213 self
.dprint("Downloadable `%s' at %s", title
, download_name
)
215 'filename' : download_name
,
219 target
= self
.get_artefact_absolute_path(download_name
, True)
220 shutil
.move(current_dirname
, target
)
224 def open_downloadable_file(self
, download_name
, mode
):
225 return open(self
.get_artefact_absolute_path(download_name
, True), mode
)
228 if not self
.log
is None:
234 def ret(self
, *args
, **kwargs
):
240 elif status
== False:
248 result
['data'][k
] = kwargs
[k
]
255 def __init__(self
, id, task
, description
, deps
, mutexes
):
257 self
.dependencies
= deps
258 self
.description
= description
261 self
.completed
= False
264 self
.mutexes
= mutexes
266 def has_completed_okay(self
):
268 return self
.completed
and (self
.status
== 'ok')
270 def has_finished(self
):
272 return self
.completed
274 def get_status(self
):
282 def set_status(self
, status
, reason
):
287 def set_done(self
, data
):
290 self
.completed
= True
292 def set_skipped(self
, reason
):
293 self
.set_status('skip', reason
)
296 class BuildScheduler
:
297 def __init__(self
, max_workers
, build
, artefact
, build_id
, printer
, inline_log_lines
= 10, debug
= False):
299 'build-directory': build
,
300 'artefact-directory': artefact
,
303 self
.printer
= printer
305 self
.start_timestamp
= time
.time()
306 self
.start_date
= datetime
.datetime
.now(datetime
.timezone
.utc
).replace(microsecond
=0).astimezone().isoformat(' ')
308 # Parent task controller
309 self
.ctl
= TaskController('scheduler', {}, build
, artefact
, self
.printer
, inline_log_lines
, debug
)
312 self
.report_file
= self
.ctl
.open_downloadable_file('report.xml', 'w')
313 self
.report_file
.write("<?xml version=\"1.0\"?>\n")
314 self
.report_file
.write("<build number=\"%s\">\n" % build_id
)
316 # The following attributes (up to self.guard declaration) are guarded
317 # by the self.guard mutex and use self.cond to notify about changes
319 # Lower granularity of the locking would be possible but would
320 # complicate too much the conditions inside queue processing where we
321 # need to react to multiple events (new task added vs some task
322 # terminated vs selecting the right task to be executed).
324 # Known tasks (regardless of their state). The mapping is between
325 # a task id and TaskWrapper class.
328 # Queue of tasks not yet run (uses task ids only). We insert mutex
329 # tasks at queue beginning (instead of appending) as a heuristic to
330 # prevent accumulation of mutex tasks at the end of run where it could
331 # hurt concurrent execution.
334 # Number of currently running (executing) tasks. Used solely to
335 # control number of concurrently running tasks.
336 self
.running_tasks_count
= 0
338 # Flag for the queue processing whether to terminate the loop to allow
339 # clean termination of the executor.
340 self
.terminate
= False
342 # Here we record which mutexes are held by executing tasks. Mutexes are
343 # identified by their (string) name that is used as index. When the
344 # value is True, the mutex is held (i.e. do not run any other task
345 # claming the same mutex), mutex is not held when the value is False
346 # or when the key is not present at all.
347 self
.task_mutexes
= {}
349 # Condition variable guarding the above attributes.
350 # We initialize CV only without attaching a lock as it creates one
351 # automatically and CV serves as a lock too.
353 # Always use notify_all as we are waiting in multiple functions
354 # for it (e.g. while processing the queue or in barrier).
355 self
.guard
= Condition()
357 # Lock guarding output synchronization
358 self
.output_lock
= Lock()
360 # Executor for running of individual tasks
361 from concurrent
.futures
import ThreadPoolExecutor
362 self
.max_workers
= max_workers
363 self
.executor
= ThreadPoolExecutor(max_workers
=max_workers
+ 2)
365 # Start the queue processor
366 self
.executor
.submit(BuildScheduler
.process_queue_wrapper
, self
)
368 def process_queue_wrapper(self
):
370 To allow debugging of the queue processor.
376 traceback
.print_exc()
378 def submit(self
, description
, task_id
, task
, deps
= [], mutexes
= []):
380 #print("Submitting {} ({}, {}, {})".format(description, task_id, deps, mutexes))
381 # Check that dependencies are known
383 if not d
in self
.tasks
:
384 raise Exception('Dependency %s is not known.' % d
)
386 wrapper
= TaskWrapper(task_id
, task
, description
, deps
, mutexes
)
387 self
.tasks
[task_id
] = wrapper
389 # Append to the queue
390 # We use a simple heuristic: if the task has no mutexes, we
391 # append to the end of the queue. Otherwise we prioritize the
392 # task a little bit to prevent ending with serialized execution
393 # of the mutually excluded tasks. (We add before first non-mutexed
399 if (len(self
.tasks
[q
].mutexes
) == 0) and (not inserted
):
400 new_queue
.append(task_id
)
404 new_queue
.append(task_id
)
405 self
.queue
= new_queue
407 self
.queue
.append(task_id
)
409 self
.guard
.notify_all()
411 def task_run_wrapper(self
, wrapper
, task_id
, can_be_run
):
413 self
.task_run_inner(wrapper
, task_id
, can_be_run
)
416 traceback
.print_exc()
418 def xml_escape_line(self
, s
):
419 from xml
.sax
.saxutils
import escape
422 s_without_ctrl
= re
.sub(r
'[\x00-\x08\x0A-\x1F]', '', s
)
423 s_escaped
= escape(s_without_ctrl
)
424 s_all_entities_encoded
= s_escaped
.encode('ascii', 'xmlcharrefreplace')
426 return s_all_entities_encoded
.decode('utf8')
428 def task_run_inner(self
, wrapper
, task_id
, can_be_run
):
432 for task_dep_id
in wrapper
.dependencies
:
433 task_dep
= self
.tasks
[task_dep_id
]
434 data
[task_dep_id
] = task_dep
.get_data()
436 wrapper
.task
.ctl
= self
.ctl
.derive(task_id
, data
)
437 wrapper
.task
.ctl
.set_log_file('%s.log' % task_id
)
440 self
.announce_task_started_(wrapper
)
443 res
= wrapper
.task
.execute()
444 if (res
== True) or (res
is None):
455 except Exception as e
:
461 #traceback.print_exc()
464 for task_dep_id
in wrapper
.dependencies
:
465 task_dep
= self
.tasks
[task_dep_id
]
466 if task_dep
.has_finished() and (not task_dep
.has_completed_okay()):
467 reason
= 'dependency %s failed (or also skipped).' % task_dep_id
472 wrapper
.task
.ctl
.append_line_to_log_file('Skipped: %s' % reason
)
474 status
= res
['status']
475 report
= wrapper
.task
.get_report()
477 if (not report
['name'] is None) and (not self
.report_file
is None):
478 report_xml
= '<' + report
['name']
479 report
['attrs']['result'] = status
480 for key
in report
['attrs']:
481 report_xml
= report_xml
+ ' %s="%s"' % (key
, report
['attrs'][key
] )
482 report_xml
= report_xml
+ ' log="logs/%s.log"' % wrapper
.id
483 report_xml
= report_xml
+ ">\n"
485 if 'files' in report
:
486 for f
in report
['files']:
487 file = '<file title="%s" filename="%s" />\n' % ( f
['title'], f
['filename'])
488 report_xml
= report_xml
+ file
490 if (not wrapper
.task
.ctl
is None) and (len(wrapper
.task
.ctl
.log_tail
) > 0):
491 report_xml
= report_xml
+ ' <log>\n'
492 for line
in wrapper
.task
.ctl
.log_tail
:
493 report_xml
= report_xml
+ ' <logline>' + self
.xml_escape_line(line
) + '</logline>\n'
494 report_xml
= report_xml
+ ' </log>\n'
496 report_xml
= report_xml
+ '</' + report
['name'] + ">\n"
499 self
.report_file
.write(report_xml
)
501 wrapper
.set_status(status
, reason
)
502 self
.announce_task_finished_(wrapper
)
503 wrapper
.set_done(res
['data'])
506 self
.running_tasks_count
= self
.running_tasks_count
- 1
509 for m
in wrapper
.mutexes
:
510 self
.task_mutexes
[ m
] = False
512 #print("Task finished, waking up (running now {})".format(self.running_tasks_count))
513 self
.guard
.notify_all()
516 def process_queue(self
):
519 #print("Process queue running, tasks {}".format(len(self.queue)))
520 # Break inside the loop
522 slot_available
= self
.running_tasks_count
< self
.max_workers
523 task_available
= len(self
.queue
) > 0
524 #print("Queue: {} (running {})".format(len(self.queue), self.running_tasks_count))
525 if slot_available
and task_available
:
527 if self
.terminate
and (not task_available
):
530 #print("Queue waiting for free slots (running {}) or tasks (have {})".format(self.running_tasks_count, len(self.queue)))
532 #print("Guard woken-up after waiting for free slots.")
534 # We have some tasks in the queue and we can run at
536 ( ready_task_id
, can_be_run
) = self
.get_first_ready_task_id_()
537 #print("Ready task is {}".format(ready_task_id))
539 if ready_task_id
is None:
540 #print("Queue waiting for new tasks to appear (have {})".format(len(self.queue)))
542 #print("Guard woken-up after no ready task.")
544 # Remove the task from the queue
545 self
.queue
.remove(ready_task_id
)
547 ready_task
= self
.tasks
[ready_task_id
]
549 # Need to update number of running tasks here and now
550 # because the executor might start the execution later
551 # and we would evaluate incorrectly the condition above
552 # that we can start another task.
553 self
.running_tasks_count
= self
.running_tasks_count
+ 1
555 #print("Ready is {}".format(ready_task))
557 for m
in ready_task
.mutexes
:
558 self
.task_mutexes
[ m
] = True
559 #print("Actually starting task {}".format(ready_task_id))
560 self
.executor
.submit(BuildScheduler
.task_run_wrapper
,
561 self
, ready_task
, ready_task_id
, can_be_run
)
563 def get_first_ready_task_id_(self
):
565 Return tuple of first task that can be run (or failed immediately)
566 with note whether the result is predetermined.
567 Returns None when no task can be run.
569 # We assume self.guard was already acquired
570 # We use here the for ... else construct of Python (recall that else
571 # is taken when break is not used)
572 for task_id
in self
.queue
:
573 task
= self
.tasks
[task_id
]
574 for task_dep_id
in task
.dependencies
:
575 task_dep
= self
.tasks
[ task_dep_id
]
576 if not task_dep
.has_finished():
578 # Failed dependency means we can return now
579 if task_dep
.get_status() != 'ok':
580 return ( task_id
, False )
582 for task_mutex
in task
.mutexes
:
583 if (task_mutex
in self
.task_mutexes
) and self
.task_mutexes
[ task_mutex
]:
586 return ( task_id
, True )
587 return ( None, None )
589 def announce_task_started_(self
, task
):
590 self
.printer
.print_starting(task
.description
+ " ...")
592 def announce_task_finished_(self
, task
):
593 description
= task
.description
594 if task
.status
== 'ok':
596 msg_color
= self
.printer
.GREEN
597 description
= description
+ '.'
598 elif task
.status
== 'skip':
600 msg_color
= self
.printer
.CYAN
601 description
= description
+ ': ' + task
.reason
604 msg_color
= self
.printer
.RED
605 description
= description
+ ': ' + task
.reason
607 self
.printer
.print_finished(msg_color
, msg
, description
)
612 #print("Barrier ({}, {})...".format(self.running_tasks_count, len(self.queue)))
613 while (self
.running_tasks_count
> 0) or (len(self
.queue
) > 0):
614 #print("Barrier waiting ({}, {})...".format(self.running_tasks_count, len(self.queue)))
619 self
.terminate
= True
620 self
.guard
.notify_all()
623 self
.executor
.shutdown(True)
625 def close_report(self
):
626 if not self
.report_file
is None:
627 end_time
= time
.time()
628 self
.report_file
.write("<buildinfo started=\"{}\" duration=\"{}\" parallelism=\"{}\" />\n".format(
629 self
.start_date
, ( end_time
- self
.start_timestamp
) * 1000,
632 self
.report_file
.write("</build>\n")
633 self
.report_file
.close()
634 self
.report_file
= None