4 # Copyright (c) 2017 Vojtech Horky
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
11 # - Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # - Redistributions in binary form must reproduce the above copyright
14 # notice, this list of conditions and the following disclaimer in the
15 # documentation and/or other materials provided with the distribution.
16 # - The name of the author may not be used to endorse or promote products
17 # derived from this software without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 # IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 # OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 # IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 # NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 # THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 from threading
import Lock
, Condition
41 def __init__(self
, report_tag
, **report_args
):
48 self
.report
['attrs'][k
] = report_args
[k
]
52 start_time
= time
.time()
56 raise Exception('run() returned False')
57 self
.report
['result'] = 'ok'
59 self
.report
['files'] = self
.ctl
.get_files()
61 end_time
= time
.time()
62 self
.report
['attrs']['duration'] = (end_time
- start_time
) * 1000
73 except Exception as e
:
74 end_time
= time
.time()
75 self
.report
['attrs']['duration'] = (end_time
- start_time
) * 1000
77 self
.report
['result'] = 'fail'
78 self
.report
['files'] = []
89 class TaskException(Exception):
90 def __init__(self
, msg
):
91 Exception.__init
__(self
, msg
)
93 class RunCommandException(TaskException
):
94 def __init__(self
, msg
, rc
, output
):
97 Exception.__init
__(self
, msg
)
100 def __init__(self
, name
, data
, build_directory
, artefact_directory
, printer
, print_debug
= False):
106 self
.build_directory
= build_directory
107 self
.artefact_directory
= artefact_directory
108 self
.printer
= printer
109 self
.print_debug_messages
= print_debug
111 def derive(self
, name
, data
):
112 return TaskController(name
, data
, self
.build_directory
, self
.artefact_directory
, self
.printer
, self
.print_debug_messages
)
114 def dprint(self
, str, *args
):
115 if self
.print_debug_messages
:
116 self
.printer
.print_debug(self
.name
, str % args
)
119 def get_dependency_data(self
, dep
, key
=None):
121 return self
.get_data(dep
)
122 return self
.data
[dep
][key
]
124 def get_data(self
, key
):
125 for dep
in self
.data
:
126 if key
in self
.data
[dep
]:
127 return self
.data
[dep
][key
]
128 raise TaskException("WARN: unknown key %s" % key
)
130 def run_command(self
, cmd
, cwd
=None, needs_output
=False):
131 self
.dprint("Running `%s'..." % ' '.join(cmd
))
136 # FIXME: can we keep stdout and stderr separated?
137 with subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
, cwd
=cwd
) as proc
:
138 for line
in proc
.stdout
:
139 line
= line
.decode('utf-8').strip('\n')
140 self
.append_line_to_log_file(line
)
147 #self.dprint('stderr=%s' % stderr.strip('\n'))
149 raise RunCommandException(
150 "`%s' failed: %s" % (' '.join(cmd
), last_line
),
155 'stdout': '\n'.join(output
),
156 'stderr': '\n'.join(output
),
158 'failed': not rc
== 0
161 def make_temp_dir(self
, name
):
162 dname
= '%s/%s' % ( self
.build_directory
, name
)
163 os
.makedirs(dname
, exist_ok
=True)
164 return os
.path
.abspath(dname
)
166 def recursive_copy(self
, src_dir
, dest_dir
):
167 os
.makedirs(dest_dir
, exist_ok
=True)
168 self
.run_command([ 'rsync', '-a', src_dir
+ '/', dest_dir
])
170 def set_log_file(self
, log_filename
):
171 # TODO: open the log lazily
172 # TODO: propagate the information to XML report
173 self
.log
= self
.open_downloadable_file('logs/' + log_filename
, 'w')
175 def append_line_to_log_file(self
, line
):
176 if not self
.log
is None:
177 self
.log
.write(line
+ '\n')
178 self
.log_tail
.append(line
)
179 self
.log_tail
= self
.log_tail
[-10:]
181 def get_artefact_absolute_path(self
, relative_name
, create_dirs
=False):
182 base
= os
.path
.dirname(relative_name
)
183 name
= os
.path
.basename(relative_name
)
184 dname
= '%s/%s/' % ( self
.artefact_directory
, base
)
187 os
.makedirs(dname
, exist_ok
=True)
189 return os
.path
.abspath(dname
+ name
)
191 # TODO: propagate title + download_name to the report
192 def add_downloadable_file(self
, title
, download_name
, current_filename
):
193 self
.dprint("Downloadable `%s' at %s", title
, download_name
)
195 'filename' : download_name
,
199 target
= self
.get_artefact_absolute_path(download_name
, True)
200 shutil
.copy(current_filename
, target
)
204 def open_downloadable_file(self
, download_name
, mode
):
205 return open(self
.get_artefact_absolute_path(download_name
, True), mode
)
208 if not self
.log
is None:
214 def ret(self
, *args
, **kwargs
):
220 elif status
== False:
228 result
['data'][k
] = kwargs
[k
]
235 def __init__(self
, id, task
, description
, deps
, mutexes
):
237 self
.dependencies
= deps
238 self
.description
= description
241 self
.completed
= False
244 self
.mutexes
= mutexes
246 def has_completed_okay(self
):
248 return self
.completed
and (self
.status
== 'ok')
250 def has_finished(self
):
252 return self
.completed
254 def get_status(self
):
262 def set_status(self
, status
, reason
):
267 def set_done(self
, data
):
270 self
.completed
= True
272 def set_skipped(self
, reason
):
273 self
.set_status('skip', reason
)
276 class BuildScheduler
:
277 def __init__(self
, max_workers
, build
, artefact
, build_id
, printer
, debug
= False):
279 'build-directory': build
,
280 'artefact-directory': artefact
,
283 self
.printer
= printer
285 self
.start_timestamp
= time
.time()
286 self
.start_date
= datetime
.datetime
.now(datetime
.timezone
.utc
).replace(microsecond
=0).astimezone().isoformat(' ')
288 # Parent task controller
289 self
.ctl
= TaskController('scheduler', {}, build
, artefact
, self
.printer
, debug
)
292 self
.report_file
= self
.ctl
.open_downloadable_file('report.xml', 'w')
293 self
.report_file
.write("<?xml version=\"1.0\"?>\n")
294 self
.report_file
.write("<build number=\"%s\">\n" % build_id
)
296 # The following attributes (up to self.guard declaration) are guarded
297 # by the self.guard mutex and use self.cond to notify about changes
299 # Lower granularity of the locking would be possible but would
300 # complicate too much the conditions inside queue processing where we
301 # need to react to multiple events (new task added vs some task
302 # terminated vs selecting the right task to be executed).
304 # Known tasks (regardless of their state). The mapping is between
305 # a task id and TaskWrapper class.
308 # Queue of tasks not yet run (uses task ids only). We insert mutex
309 # tasks at queue beginning (instead of appending) as a heuristic to
310 # prevent accumulation of mutex tasks at the end of run where it could
311 # hurt concurrent execution.
314 # Number of currently running (executing) tasks. Used solely to
315 # control number of concurrently running tasks.
316 self
.running_tasks_count
= 0
318 # Flag for the queue processing whether to terminate the loop to allow
319 # clean termination of the executor.
320 self
.terminate
= False
322 # Here we record which mutexes are held by executing tasks. Mutexes are
323 # identified by their (string) name that is used as index. When the
324 # value is True, the mutex is held (i.e. do not run any other task
325 # claming the same mutex), mutex is not held when the value is False
326 # or when the key is not present at all.
327 self
.task_mutexes
= {}
329 # Condition variable guarding the above attributes.
330 # We initialize CV only without attaching a lock as it creates one
331 # automatically and CV serves as a lock too.
333 # Always use notify_all as we are waiting in multiple functions
334 # for it (e.g. while processing the queue or in barrier).
335 self
.guard
= Condition()
337 # Lock guarding output synchronization
338 self
.output_lock
= Lock()
340 # Executor for running of individual tasks
341 from concurrent
.futures
import ThreadPoolExecutor
342 self
.max_workers
= max_workers
343 self
.executor
= ThreadPoolExecutor(max_workers
=max_workers
+ 2)
345 # Start the queue processor
346 self
.executor
.submit(BuildScheduler
.process_queue_wrapper
, self
)
348 def process_queue_wrapper(self
):
350 To allow debugging of the queue processor.
356 traceback
.print_exc()
358 def submit(self
, description
, task_id
, task
, deps
= [], mutexes
= []):
360 #print("Submitting {} ({}, {}, {})".format(description, task_id, deps, mutexes))
361 # Check that dependencies are known
363 if not d
in self
.tasks
:
364 raise Exception('Dependency %s is not known.' % d
)
366 wrapper
= TaskWrapper(task_id
, task
, description
, deps
, mutexes
)
367 self
.tasks
[task_id
] = wrapper
369 # Append to the queue
370 # We use a simple heuristic: if the task has no mutexes, we
371 # append to the end of the queue. Otherwise we prioritize the
372 # task a little bit to prevent ending with serialized execution
373 # of the mutually excluded tasks. (We add before first non-mutexed
379 if (len(self
.tasks
[q
].mutexes
) == 0) and (not inserted
):
380 new_queue
.append(task_id
)
384 new_queue
.append(task_id
)
385 self
.queue
= new_queue
387 self
.queue
.append(task_id
)
389 self
.guard
.notify_all()
391 def task_run_wrapper(self
, wrapper
, task_id
, can_be_run
):
393 self
.task_run_inner(wrapper
, task_id
, can_be_run
)
396 traceback
.print_exc()
398 def xml_escape_line(self
, s
):
399 from xml
.sax
.saxutils
import escape
402 s_without_ctrl
= re
.sub(r
'[\x00-\x08\x0A-\x1F]', '', s
)
403 s_escaped
= escape(s_without_ctrl
)
404 s_all_entities_encoded
= s_escaped
.encode('ascii', 'xmlcharrefreplace')
406 return s_all_entities_encoded
.decode('utf8')
408 def task_run_inner(self
, wrapper
, task_id
, can_be_run
):
412 for task_dep_id
in wrapper
.dependencies
:
413 task_dep
= self
.tasks
[task_dep_id
]
414 data
[task_dep_id
] = task_dep
.get_data()
416 wrapper
.task
.ctl
= self
.ctl
.derive(task_id
, data
)
417 wrapper
.task
.ctl
.set_log_file('%s.log' % task_id
)
420 self
.announce_task_started_(wrapper
)
423 res
= wrapper
.task
.execute()
424 if (res
== True) or (res
is None):
435 except Exception as e
:
441 #traceback.print_exc()
444 for task_dep_id
in wrapper
.dependencies
:
445 task_dep
= self
.tasks
[task_dep_id
]
446 if task_dep
.has_finished() and (not task_dep
.has_completed_okay()):
447 reason
= 'dependency %s failed (or also skipped).' % task_dep_id
452 wrapper
.task
.ctl
.append_line_to_log_file('Skipped: %s' % reason
)
454 status
= res
['status']
455 report
= wrapper
.task
.get_report()
457 if (not report
['name'] is None) and (not self
.report_file
is None):
458 report_xml
= '<' + report
['name']
459 report
['attrs']['result'] = status
460 for key
in report
['attrs']:
461 report_xml
= report_xml
+ ' %s="%s"' % (key
, report
['attrs'][key
] )
462 report_xml
= report_xml
+ ' log="logs/%s.log"' % wrapper
.id
463 report_xml
= report_xml
+ ">\n"
465 if 'files' in report
:
466 for f
in report
['files']:
467 file = '<file title="%s" filename="%s" />\n' % ( f
['title'], f
['filename'])
468 report_xml
= report_xml
+ file
470 if (not wrapper
.task
.ctl
is None) and (len(wrapper
.task
.ctl
.log_tail
) > 0):
471 report_xml
= report_xml
+ ' <log>\n'
472 for line
in wrapper
.task
.ctl
.log_tail
:
473 report_xml
= report_xml
+ ' <logline>' + self
.xml_escape_line(line
) + '</logline>\n'
474 report_xml
= report_xml
+ ' </log>\n'
476 report_xml
= report_xml
+ '</' + report
['name'] + ">\n"
479 self
.report_file
.write(report_xml
)
481 wrapper
.set_status(status
, reason
)
482 self
.announce_task_finished_(wrapper
)
483 wrapper
.set_done(res
['data'])
486 self
.running_tasks_count
= self
.running_tasks_count
- 1
489 for m
in wrapper
.mutexes
:
490 self
.task_mutexes
[ m
] = False
492 #print("Task finished, waking up (running now {})".format(self.running_tasks_count))
493 self
.guard
.notify_all()
496 def process_queue(self
):
499 #print("Process queue running, tasks {}".format(len(self.queue)))
500 # Break inside the loop
502 slot_available
= self
.running_tasks_count
< self
.max_workers
503 task_available
= len(self
.queue
) > 0
504 #print("Queue: {} (running {})".format(len(self.queue), self.running_tasks_count))
505 if slot_available
and task_available
:
507 if self
.terminate
and (not task_available
):
510 #print("Queue waiting for free slots (running {}) or tasks (have {})".format(self.running_tasks_count, len(self.queue)))
512 #print("Guard woken-up after waiting for free slots.")
514 # We have some tasks in the queue and we can run at
516 ( ready_task_id
, can_be_run
) = self
.get_first_ready_task_id_()
517 #print("Ready task is {}".format(ready_task_id))
519 if ready_task_id
is None:
520 #print("Queue waiting for new tasks to appear (have {})".format(len(self.queue)))
522 #print("Guard woken-up after no ready task.")
524 # Remove the task from the queue
525 self
.queue
.remove(ready_task_id
)
527 ready_task
= self
.tasks
[ready_task_id
]
529 # Need to update number of running tasks here and now
530 # because the executor might start the execution later
531 # and we would evaluate incorrectly the condition above
532 # that we can start another task.
533 self
.running_tasks_count
= self
.running_tasks_count
+ 1
535 #print("Ready is {}".format(ready_task))
537 for m
in ready_task
.mutexes
:
538 self
.task_mutexes
[ m
] = True
539 #print("Actually starting task {}".format(ready_task_id))
540 self
.executor
.submit(BuildScheduler
.task_run_wrapper
,
541 self
, ready_task
, ready_task_id
, can_be_run
)
543 def get_first_ready_task_id_(self
):
545 Return tuple of first task that can be run (or failed immediately)
546 with note whether the result is predetermined.
547 Returns None when no task can be run.
549 # We assume self.guard was already acquired
550 # We use here the for ... else construct of Python (recall that else
551 # is taken when break is not used)
552 for task_id
in self
.queue
:
553 task
= self
.tasks
[task_id
]
554 for task_dep_id
in task
.dependencies
:
555 task_dep
= self
.tasks
[ task_dep_id
]
556 if not task_dep
.has_finished():
558 # Failed dependency means we can return now
559 if task_dep
.get_status() != 'ok':
560 return ( task_id
, False )
562 for task_mutex
in task
.mutexes
:
563 if (task_mutex
in self
.task_mutexes
) and self
.task_mutexes
[ task_mutex
]:
566 return ( task_id
, True )
567 return ( None, None )
569 def announce_task_started_(self
, task
):
570 self
.printer
.print_starting(task
.description
+ " ...")
572 def announce_task_finished_(self
, task
):
573 description
= task
.description
574 if task
.status
== 'ok':
576 msg_color
= self
.printer
.GREEN
577 description
= description
+ '.'
578 elif task
.status
== 'skip':
580 msg_color
= self
.printer
.CYAN
581 description
= description
+ ': ' + task
.reason
584 msg_color
= self
.printer
.RED
585 description
= description
+ ': ' + task
.reason
587 self
.printer
.print_finished(msg_color
, msg
, description
)
592 #print("Barrier ({}, {})...".format(self.running_tasks_count, len(self.queue)))
593 while (self
.running_tasks_count
> 0) or (len(self
.queue
) > 0):
594 #print("Barrier waiting ({}, {})...".format(self.running_tasks_count, len(self.queue)))
599 self
.terminate
= True
600 self
.guard
.notify_all()
603 self
.executor
.shutdown(True)
605 def close_report(self
):
606 if not self
.report_file
is None:
607 end_time
= time
.time()
608 self
.report_file
.write("<buildinfo started=\"{}\" duration=\"{}\" parallelism=\"{}\" />\n".format(
609 self
.start_date
, ( end_time
- self
.start_timestamp
) * 1000,
612 self
.report_file
.write("</build>\n")
613 self
.report_file
.close()
614 self
.report_file
= None