Sorry for the combined patch. The changes became too inter-dependent.
[beagle.git] / Util / Scheduler.cs
blob7dbf703463553f63725cd5de15f773db8bf435ed
1 //
2 // Scheduler.cs
3 //
4 // Copyright (C) 2004-2005 Novell, Inc.
5 //
7 //
8 // Permission is hereby granted, free of charge, to any person obtaining a
9 // copy of this software and associated documentation files (the "Software"),
10 // to deal in the Software without restriction, including without limitation
11 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
12 // and/or sell copies of the Software, and to permit persons to whom the
13 // Software is furnished to do so, subject to the following conditions:
15 // The above copyright notice and this permission notice shall be included in
16 // all copies or substantial portions of the Software.
18 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 // DEALINGS IN THE SOFTWARE.
27 using System;
28 using System.Collections;
29 using System.Text;
30 using System.Threading;
31 using System.Xml;
32 using System.Xml.Serialization;
34 namespace Beagle.Util {
36 public class Scheduler {
38 // Fire an event if there are no tasks left to execute.
39 public delegate void EmptyQueueDelegate ();
40 public event EmptyQueueDelegate EmptyQueueEvent;
42 public static bool Debug = false;
44 public enum Priority {
46 Shutdown = 0, // Do it on shutdown
48 Idle = 1, // Execute only when the whole machine is idle
49 // Probably should be reserved for computationally-expensive stuff
50 // FIXME: These are not properly scheduled right now
52 Maintenance = 2, // Only execute when there are no lower-priority
53 // tasks from the same source to execute instead
55 Delayed = 3, // Do it soon
57 Immediate = 4, // Do it right now
60 public delegate void Hook ();
61 public delegate void TaskHook (Task task);
63 //////////////////////////////////////////////////////////////////////////////
65 public abstract class Task : IComparable {
67 private string tag = null;
68 private Priority priority = Priority.Delayed;
69 private int sub_priority = 0;
70 private DateTime trigger_time = DateTime.MinValue;
71 private DateTime timestamp; // when added to the scheduler
73 // Some metadata
74 public string Creator;
75 public string Description;
77 public object Source = null; // this is just an opaque identifier
79 public ITaskCollector Collector = null;
80 public double Weight = 1.0;
82 public bool Reschedule = false;
84 private ArrayList task_groups = null;
85 private TaskGroupPrivate child_task_group = null;
87 ///////////////////////////////
90 // The tag is the task's unique identifier
91 public string Tag {
93 get { return tag; }
95 set {
96 // Don't allow us to change the tag of a scheduled task
97 if (tag == null || scheduler == null)
98 tag = value;
99 else
100 throw new Exception ("Can't change tag of " + tag + "!");
104 public Priority Priority {
106 get { return priority; }
108 set {
109 if (priority != value) {
110 priority = value;
111 Recompute ();
116 public int SubPriority {
118 get { return sub_priority; }
120 set {
121 if (sub_priority != value) {
122 sub_priority = value;
123 Recompute ();
128 public DateTime TriggerTime {
130 get { return trigger_time; }
132 set {
133 if (trigger_time != value) {
134 trigger_time = value;
135 Recompute ();
140 public DateTime Timestamp {
141 get { return timestamp; }
144 ///////////////////////////////
146 public void AddTaskGroup (TaskGroup group)
148 if (task_groups == null)
149 task_groups = new ArrayList ();
150 task_groups.Add (group);
153 private void IncrementAllTaskGroups ()
155 if (task_groups != null) {
156 foreach (TaskGroupPrivate group in task_groups) {
157 if (! group.Finished)
158 group.Increment ();
163 private void DecrementAllTaskGroups ()
165 if (task_groups != null) {
166 foreach (TaskGroupPrivate group in task_groups) {
167 if (! group.Finished)
168 group.Decrement ();
173 private void TouchAllTaskGroups ()
175 if (task_groups != null) {
176 foreach (TaskGroupPrivate group in task_groups) {
177 if (! group.Finished)
178 group.Touch ();
183 ///////////////////////////////
185 private Scheduler scheduler = null;
187 public void Schedule (Scheduler scheduler)
189 // Increment the task groups the first
190 // time a task is scheduled.
191 if (this.scheduler == null)
192 IncrementAllTaskGroups ();
193 this.timestamp = DateTime.Now;
194 this.scheduler = scheduler;
195 this.cancelled = false;
198 private void Recompute ()
200 if (scheduler != null)
201 scheduler.Recompute ();
204 ///////////////////////////////
206 // A blocked task will not execute.
207 public bool Blocked {
208 get {
209 // Block the task if we have unexecuted children
210 return child_task_group != null && ! child_task_group.Finished;
214 ///////////////////////////////
216 private bool cancelled = false;
218 public bool Cancelled {
219 get { return cancelled; }
222 public void Cancel ()
224 if (! cancelled) {
225 DecrementAllTaskGroups ();
226 Cleanup (); // clean up after cancelled tasks
228 cancelled = true;
231 ///////////////////////////////
233 // The Task's count keeps track of how many
234 // times it has been executed.
236 private int count = 0;
238 public int Count {
239 get { return count; }
242 ///////////////////////////////
244 public void SpawnChild (Task child_task)
246 if (child_task_group == null)
247 child_task_group = new TaskGroupPrivate ("Children of " + Tag, null, null);
248 child_task.AddTaskGroup (child_task_group);
249 child_task.Source = this.Source;
250 scheduler.Add (child_task);
253 ///////////////////////////////
255 public void DoTask ()
257 if (! cancelled) {
258 if (Debug)
259 Logger.Log.Debug ("Starting task {0}", Tag);
260 child_task_group = null;
261 Reschedule = false;
262 TouchAllTaskGroups ();
264 Stopwatch sw = new Stopwatch ();
265 sw.Start ();
267 try {
268 DoTaskReal ();
269 } catch (Exception ex) {
270 Logger.Log.Warn ("Caught exception in DoTaskReal");
271 Logger.Log.Warn (" Tag: {0}", Tag);
272 Logger.Log.Warn (" Creator: {0}", Creator);
273 Logger.Log.Warn ("Description: {0}", Description);
274 Logger.Log.Warn (" Priority: {0} ({1})", Priority, SubPriority);
275 Logger.Log.Warn (ex);
277 sw.Stop ();
278 if (Debug)
279 Logger.Log.Debug ("Finished task {0} in {1}", Tag, sw);
281 if (Reschedule) {
282 ++count;
283 scheduler.Add (this); // re-add ourselves
284 } else {
285 DecrementAllTaskGroups ();
286 scheduler = null;
291 protected abstract void DoTaskReal ();
293 ///////////////////////////////
295 // Clean-up is called whenever we know that a task will never
296 // be executed. It is never called on tasks for who DoTaskReal
297 // has been called (except when rescheduled). Cleanup is also
298 // called when a task is cancelled.
300 public void Cleanup ()
302 try {
303 DoCleanup ();
304 } catch (Exception ex) {
305 Logger.Log.Warn ("Caught exception cleaning up task '{0}'", Tag);
306 Logger.Log.Warn (ex);
310 protected virtual void DoCleanup ()
312 // Do nothing by default
315 ///////////////////////////////
317 // Sort from lowest to highest priority
318 // FIXME: This does not define a total ordering
319 // on the set of all tasks, so use it with care.
320 public int CompareTo (object obj)
322 Task other = obj as Task;
323 if (other == null)
324 return 1;
326 Priority this_priority;
327 Priority other_priority;
329 this_priority = this.Priority;
330 other_priority = other.Priority;
332 // To other sources, Maintenance tasks looks like
333 // Delayed tasks.
334 if (this.Source != other.Source) {
335 if (this_priority == Priority.Maintenance)
336 this_priority = Priority.Delayed;
337 if (other_priority == Priority.Maintenance)
338 other_priority = Priority.Delayed;
341 int cmp;
342 cmp = (int)this_priority - (int)other_priority;
343 if (cmp != 0)
344 return cmp;
346 cmp = this.SubPriority - other.SubPriority;
347 if (cmp != 0)
348 return cmp;
350 // Tasks that were added to the scheduler earlier take
351 // precedence over those that were added later.
352 cmp = DateTime.Compare (other.Timestamp, this.Timestamp);
353 if (cmp != 0)
354 return cmp;
356 // Try to break any ties
357 return this.GetHashCode () - other.GetHashCode ();
360 public void AppendToStringBuilder (StringBuilder sb)
362 sb.Append (Priority).Append (' ').Append (SubPriority);
363 sb.Append (" (").Append (Timestamp).Append (")\n");
365 sb.Append (Tag).Append ('\n');
367 double t = (TriggerTime - DateTime.Now).TotalSeconds;
368 if (t > 0) {
369 if (t < 120)
370 sb.AppendFormat ("Hold for {0:0.00} seconds\n", t);
371 else {
372 sb.Append ("Hold until ").Append (TriggerTime);
373 sb.Append ('\n');
377 if (Creator != null)
378 sb.Append ("Creator: ").Append (Creator).Append ('\n');
380 if (Description != null)
381 sb.Append (Description).Append ('\n');
385 private class TaskHookWrapper : Task {
387 TaskHook hook;
389 public TaskHookWrapper (TaskHook hook)
391 this.hook = hook;
394 protected override void DoTaskReal ()
396 if (hook != null)
397 hook (this);
401 public static Task TaskFromHook (TaskHook hook)
403 return new TaskHookWrapper (hook);
406 //////////////////////////////////////////////////////////////////////////////
409 // Task Groups
412 public static TaskGroup NewTaskGroup (string name, Hook pre_hook, Hook post_hook)
414 return new TaskGroupPrivate (name, pre_hook, post_hook);
417 // The TaskGroup we hand back to the user is an interface that
418 // exposes minimal functionality.
419 public interface TaskGroup {
420 string Name { get; }
421 bool Finished { get; }
424 private class TaskGroupPrivate : TaskGroup {
425 private string name;
426 private int task_count = 0;
427 private bool touched = false;
428 private bool finished = false;
429 private Hook pre_hook;
430 private Hook post_hook;
432 public TaskGroupPrivate (string name,
433 Hook pre_hook,
434 Hook post_hook)
436 this.name = name;
437 this.pre_hook = pre_hook;
438 this.post_hook = post_hook;
441 public string Name {
442 get { return name; }
445 public bool Finished {
446 get { return finished; }
449 // Call this when a task is added to the task group.
450 public void Increment ()
452 if (finished)
453 throw new Exception ("Tried to increment a finished TaskGroup");
454 ++task_count;
457 // Call this when we execute a task in the task group.
458 public void Touch ()
460 if (finished)
461 throw new Exception ("Tried to touch a finished TaskGroup");
463 if (! touched) {
464 if (pre_hook != null) {
465 try {
466 pre_hook ();
467 } catch (Exception ex) {
468 Logger.Log.Warn ("Caught exception in pre_hook of task group '{0}'", Name);
469 Logger.Log.Warn (ex);
472 touched = true;
476 // Call this after a task in the task group is complete.
477 public void Decrement ()
479 if (finished)
480 throw new Exception ("Tried to decrement a finished TaskGroup");
482 --task_count;
483 // Only fire our post-hook if the pre-hook fired
484 // (or would have fired, had it been non-null)
485 if (task_count == 0 && touched) {
486 if (post_hook != null) {
487 try {
488 post_hook ();
489 } catch (Exception ex) {
490 Logger.Log.Warn ("Caught exception in post_hook of task group '{0}'", Name);
491 Logger.Log.Warn (ex);
494 finished = true;
499 //////////////////////////////////////////////////////////////////////////////
502 // Task Collector
504 // This is a mechanism for executing tasks in sets, possibly outside of
505 // priority order.
508 public interface ITaskCollector {
510 double GetMaximumWeight ();
512 void PreTaskHook ();
513 void PostTaskHook ();
516 //////////////////////////////////////////////////////////////////////////////
518 private static double global_delay = -1.0;
520 static Scheduler ()
522 string exercise;
523 exercise = Environment.GetEnvironmentVariable ("BEAGLE_EXERCISE_THE_DOG");
525 if (exercise != null) {
526 if (exercise.Length > 2 && exercise [0] == 't')
527 global_delay = Double.Parse (exercise.Substring (1));
528 else
529 global_delay = 0.0;
533 //////////////////////////////////////////////////////////////////////////////
535 private static Scheduler global = new Scheduler ();
537 public static Scheduler Global {
538 get { return global; }
541 //////////////////////////////////////////////////////////////////////////////
543 private object big_lock = new object ();
545 // FIXME: shutdown tasks should probably be ordered by something
546 private Queue shutdown_task_queue = new Queue ();
548 private Hashtable tasks_by_tag = new Hashtable ();
549 private int total_executed_task_count = 0;
551 public void Add (Task task)
553 if (task == null)
554 return;
556 if (task.Source == null)
557 throw new Exception ("Attempting to add Task with no source!");
559 Task old_task = null;
561 lock (big_lock) {
563 // Keep track of when immediate priority tasks are
564 // added so that we can throttle if the scheduler
565 // is being slammed with them.
566 if (task.Priority == Priority.Immediate) {
567 // Shift our times down by one
568 Array.Copy (last_immediate_times, 1, last_immediate_times, 0, 4);
569 last_immediate_times [4] = DateTime.Now;
572 old_task = tasks_by_tag [task.Tag] as Task;
574 task.Schedule (this);
576 // Re-adding the same task is basically a no-op --- we
577 // just update the timestamp and return.
578 if (old_task == task)
579 return;
581 if (Debug) {
582 Logger.Log.Debug ("Adding task");
583 Logger.Log.Debug ("Tag: {0}", task.Tag);
584 if (task.Description != null)
585 Logger.Log.Debug ("Desc: {0}", task.Description);
588 if (task.Priority == Priority.Shutdown)
589 shutdown_task_queue.Enqueue (task);
590 else
591 tasks_by_tag [task.Tag] = task;
593 Monitor.Pulse (big_lock);
596 // If we clobbered another task, call cancel on it.
597 // This happens after we release the lock, since
598 // cancellation could result in a task group post-hook
599 // being run.
600 if (old_task != null)
601 old_task.Cancel ();
604 public Task GetByTag (string tag)
606 lock (big_lock)
607 return tasks_by_tag [tag] as Task;
610 public bool ContainsByTag (string tag)
612 Task task = GetByTag (tag);
613 return task != null && !task.Cancelled;
616 public void Recompute ()
618 lock (big_lock)
619 Monitor.Pulse (big_lock);
622 //////////////////////////////////////////////////////////////////////////////
624 private Thread thread = null;
625 public bool running = false;
626 private static bool shutdown_requested = false;
628 public void Start ()
630 lock (this) {
631 if (shutdown_requested || thread != null)
632 return;
633 running = true;
634 thread = ExceptionHandlingThread.Start (new ThreadStart (Worker));
638 public void Stop (bool to_shutdown)
640 lock (big_lock) {
641 shutdown_requested = to_shutdown;
643 if (running) {
644 running = false;
645 thread = null;
646 status_str = "Stopped";
647 Monitor.Pulse (big_lock);
652 public void Stop ()
654 Stop (false);
658 // Delay Computations
660 // This code controls how we space out tasks
663 // FIXME: random magic constants
664 const double idle_threshold = 5.314159 * 60; // probably should be longer
665 const double idle_ramp_up_time = 5.271828 * 60; // probably should be longer
666 const double default_delayed_rate_factor = 9.03; // work about 1/10th of the time
667 const double default_idle_rate_factor = 2.097; // work about 1/3rd of the time
668 const double maximum_delay = 20; // never wait for more than 20s
669 const double min_throttled_delay = 1.5; // never wait less than this when throttled
670 const double min_overloaded_delay = 2.2; // never wait less than this when there are many tasks
671 const int task_overload_threshold = 60; // number of tasks to process before delaying
673 DateTime[] last_immediate_times = new DateTime [5];
675 // The return value and duration_of_previous_task are both measured in seconds.
676 private double ComputeDelay (Priority priority_of_next_task,
677 double duration_of_previous_task,
678 int executed_task_count)
680 if (global_delay >= 0.0)
681 return global_delay;
683 double rate_factor;
685 rate_factor = 2.0;
687 // Do everything faster the longer we are idle.
688 double idle_time = SystemInformation.InputIdleTime;
689 double idle_scale = 1.0;
690 bool is_idle = false;
691 bool need_throttle = false;
693 // Never speed up if we are using the battery.
694 if (idle_time > idle_threshold && ! SystemInformation.UsingBattery) {
695 is_idle = true;
696 double t = (idle_time - idle_threshold) / idle_ramp_up_time;
697 idle_scale = (1 - Math.Min (t, 1.0));
700 switch (priority_of_next_task) {
702 case Priority.Immediate:
703 rate_factor = 0;
705 if (last_immediate_times [0] != DateTime.MinValue) {
706 TimeSpan last_add_delta = DateTime.Now.Subtract (last_immediate_times [4]);
708 // If less than a second has gone by since the
709 // last immediate task was added, there is
710 // still a torrent of events coming in, and we
711 // may need to throttle.
712 if (last_add_delta.TotalSeconds <= 1) {
713 TimeSpan between_add_delta = last_immediate_times [4].Subtract (last_immediate_times [0]);
715 // At least 5 immediate tasks have been
716 // added in the last second. We
717 // definitely need to throttle.
718 if (between_add_delta.TotalSeconds <= 1) {
719 need_throttle = true;
720 rate_factor = idle_scale * default_idle_rate_factor;
725 // If we've processed many tasks since the last
726 // time we took a break, ignore the priority and set a
727 // delay equivalent to Priority.Delayed.
728 if (!is_idle && executed_task_count >= task_overload_threshold)
729 rate_factor = idle_scale * default_delayed_rate_factor;
731 break;
733 case Priority.Delayed:
734 rate_factor = idle_scale * default_delayed_rate_factor;
735 break;
737 case Priority.Idle:
738 rate_factor = idle_scale * default_idle_rate_factor;
739 break;
743 // FIXME: we should do something more sophisticated than this
744 // with the load average.
745 // Random numbers galore!
746 double load_average = SystemInformation.LoadAverageOneMinute;
747 if (load_average > 3.001)
748 rate_factor *= 5.002;
749 else if (load_average > 1.5003)
750 rate_factor *= 2.004;
752 double delay = rate_factor * duration_of_previous_task;
754 // space out delayed tasks a bit when we aren't idle
755 if (! is_idle
756 && priority_of_next_task == Priority.Delayed
757 && delay < 0.5)
758 delay = 0.5;
760 if (delay > maximum_delay)
761 delay = maximum_delay;
763 // If we need to throttle, make sure we don't delay less than
764 // a second and some.
765 if (need_throttle && delay < min_throttled_delay)
766 delay = min_throttled_delay;
768 // If we're not idle and we've just processed more
769 // than a certain number of events, take a break.
770 if (! is_idle
771 && executed_task_count >= task_overload_threshold
772 && delay < min_overloaded_delay)
773 delay = min_overloaded_delay;
775 return delay;
779 // The main loop
782 // A convenience function. There should be a
783 // constructor to TimeSpan that does this.
784 private static TimeSpan TimeSpanFromSeconds (double t)
786 // Wait barfs if you hand it a negative TimeSpan,
787 // so we are paranoid;
788 if (t < 0.001)
789 t = 0;
791 // 1 tick = 100 nanoseconds
792 long ticks = (long) (t * 1.0e+7);
793 return new TimeSpan (ticks);
796 private string status_str = null;
798 private void Worker ()
800 DateTime end_time_of_previous_task = DateTime.MinValue;
801 double duration_of_previous_task = 0.0;
803 Hook pre_hook = null;
804 Hook post_hook = null;
805 ArrayList to_be_executed = new ArrayList ();
806 Hashtable max_priority_by_source = new Hashtable ();
807 int executed_task_count = 0;
808 StringBuilder status_builder = new StringBuilder ();
810 while (running) {
812 status_str = "Finding next task to execute";
814 lock (big_lock) {
816 // If there are no pending tasks, wait
817 // on our lock and then re-start our
818 // while loop
819 if (tasks_by_tag.Count == 0) {
820 if (EmptyQueueEvent != null)
821 EmptyQueueEvent ();
822 status_str = "Waiting on empty queue";
823 Monitor.Wait (big_lock);
824 executed_task_count = 0;
825 continue;
828 // Walk across our list of tasks and find
829 // the next one to execute.
830 DateTime now = DateTime.Now;
831 DateTime next_trigger_time = DateTime.MaxValue;
833 // Make a first pass over our tasks, finding the
834 // highest-priority item per source.
835 max_priority_by_source.Clear ();
836 foreach (Task task in tasks_by_tag.Values) {
837 if (task.Blocked || task.TriggerTime >= now)
838 continue;
839 if (max_priority_by_source.Contains (task.Source)) {
840 Priority p = (Priority) max_priority_by_source [task.Source];
841 if (p < task.Priority)
842 max_priority_by_source [task.Source] = task.Priority;
843 } else {
844 max_priority_by_source [task.Source] = task.Priority;
848 // Now make a second pass over the tasks and find
849 // the highest-priority item. We use the information
850 // from the first pass to correctly prioritize maintenance tasks.
851 Task next_task = null;
852 foreach (Task task in tasks_by_tag.Values) {
853 if (task.Blocked)
854 continue;
855 if (task.TriggerTime >= now) {
856 if (task.TriggerTime < next_trigger_time)
857 next_trigger_time = task.TriggerTime;
858 continue;
861 // If this is a maintenance task and there is a high-priority
862 // task from the same source, skip it.
863 if (task.Priority == Priority.Maintenance) {
864 Priority p = (Priority) max_priority_by_source [task.Source];
865 if (p > task.Priority)
866 continue;
869 if (task.TriggerTime < now) {
870 if (next_task == null || next_task.CompareTo (task) < 0)
871 next_task = task;
875 // If we didn't find a task, wait for the next trigger-time
876 // and then re-start our while loop.
877 if (next_task == null) {
878 if (next_trigger_time == DateTime.MaxValue) {
879 status_str = "Waiting for an unblocked task";
880 Monitor.Wait (big_lock);
881 } else {
882 status_str = "Waiting for the next trigger time";
883 Monitor.Wait (big_lock, next_trigger_time - now);
885 executed_task_count = 0;
886 continue;
889 // If we did find a task, do we want to execute it right now?
890 // Or should we wait a bit?
892 // How should we space things out?
893 double delay = 0;
894 delay = ComputeDelay (next_task.Priority, duration_of_previous_task, executed_task_count);
895 delay = Math.Min (delay, (next_trigger_time - now).TotalSeconds);
897 // Adjust by the time that has actually elapsed since the
898 // last task.
899 delay -= (now - end_time_of_previous_task).TotalSeconds;
901 // If we still need to wait a bit longer, wait for the appropriate
902 // amount of time and then re-start our while loop.
903 if (delay > 0.001) {
904 status_str = "Waiting for next task.";
905 Monitor.Wait (big_lock, TimeSpanFromSeconds (delay));
906 executed_task_count = 0;
907 continue;
911 // If we've made it to this point, it is time to start
912 // executing our selected task.
915 to_be_executed.Clear ();
917 if (next_task.Collector == null) {
919 to_be_executed.Add (next_task);
921 } else {
923 pre_hook = new Hook (next_task.Collector.PreTaskHook);
924 post_hook = new Hook (next_task.Collector.PostTaskHook);
926 // Find all eligible tasks with the same collector,
927 // and add them to the collection list.
928 now = DateTime.Now;
929 foreach (Task task in tasks_by_tag.Values)
930 if (task != next_task
931 && task.Collector == next_task.Collector
932 && !task.Blocked
933 && task.TriggerTime < now)
934 to_be_executed.Add (task);
936 // Order the tasks from highest to lowest priority.
937 // Our original task will always be the first item
938 // in the resulting array.
939 to_be_executed.Sort ();
940 to_be_executed.Add (next_task);
941 to_be_executed.Reverse ();
943 // Now find how many tasks can be executed before we
944 // exceed the collector's maximum weight. If necessary,
945 // prune the list of tasks.
946 double remaining_weight;
947 remaining_weight = next_task.Collector.GetMaximumWeight ();
948 int i = 0;
949 while (i < to_be_executed.Count && remaining_weight > 0) {
950 Task task;
951 task = to_be_executed [i] as Task;
952 remaining_weight -= task.Weight;
953 ++i;
955 if (i < to_be_executed.Count)
956 to_be_executed.RemoveRange (i, to_be_executed.Count - i);
959 // Remove the tasks we are about to execute from our
960 // master list.
961 foreach (Task task in to_be_executed)
962 tasks_by_tag.Remove (task.Tag);
964 // Pulse our lock, in case anyone is waiting for it.
965 Monitor.Pulse (big_lock);
968 // Now actually execute the set of tasks we found.
970 status_builder.Length = 0;
971 status_builder.Append ("Executing task");
972 if (to_be_executed.Count > 1)
973 status_builder.Append ('s');
974 status_builder.Append ('\n');
975 foreach (Task task in to_be_executed) {
976 task.AppendToStringBuilder (status_builder);
977 status_builder.Append ('\n');
979 status_str = status_builder.ToString ();
981 DateTime start_time = DateTime.Now;
982 if (pre_hook != null) {
983 try {
984 pre_hook ();
985 } catch (Exception ex) {
986 Logger.Log.Error ("Caught exception in pre_hook '{0}'", pre_hook);
987 Logger.Log.Error (ex);
990 foreach (Task task in to_be_executed) {
991 task.DoTask ();
992 ++total_executed_task_count;
993 ++executed_task_count;
995 if (post_hook != null) {
996 try {
997 post_hook ();
998 } catch (Exception ex) {
999 Logger.Log.Error ("Caught exception in post_hook '{0}'", post_hook);
1000 Logger.Log.Error (ex);
1004 end_time_of_previous_task = DateTime.Now;
1005 duration_of_previous_task = (end_time_of_previous_task - start_time).TotalSeconds;
1008 // Execute all shutdown tasks
1009 foreach (Task task in shutdown_task_queue)
1010 if (! task.Cancelled && ! task.Blocked)
1011 task.DoTask ();
1013 // Call Cleanup on all of our unexecuted tasks
1014 foreach (Task task in tasks_by_tag.Values)
1015 task.Cleanup ();
1017 if (Debug)
1018 Logger.Log.Debug ("Scheduler.Worker finished");
1021 //////////////////////////////////////////////////////////////////////////////
1023 private static StringBuilder cached_sb = new StringBuilder ();
1025 public SchedulerInformation GetCurrentStatus ()
1027 SchedulerInformation current_status = new SchedulerInformation ();
1029 lock (big_lock) {
1031 ArrayList blocked_tasks = new ArrayList ();
1032 ArrayList future_tasks = new ArrayList ();
1033 ArrayList pending_tasks = new ArrayList ();
1035 DateTime now = DateTime.Now;
1036 foreach (Task task in tasks_by_tag.Values) {
1037 if (task.Blocked)
1038 blocked_tasks.Add (task);
1039 else if (task.TriggerTime > now)
1040 future_tasks.Add (task);
1041 else
1042 pending_tasks.Add (task);
1045 blocked_tasks.Sort ();
1046 blocked_tasks.Reverse ();
1048 future_tasks.Sort ();
1049 future_tasks.Reverse ();
1051 pending_tasks.Sort ();
1052 pending_tasks.Reverse ();
1054 foreach (Task task in pending_tasks) {
1055 cached_sb.Length = 0;
1056 task.AppendToStringBuilder (cached_sb);
1057 current_status.PendingTasks.Add (cached_sb.ToString ());
1060 foreach (Task task in future_tasks) {
1061 cached_sb.Length = 0;
1062 task.AppendToStringBuilder (cached_sb);
1063 current_status.FutureTasks.Add (cached_sb.ToString ());
1066 foreach (Task task in blocked_tasks) {
1067 cached_sb.Length = 0;
1068 task.AppendToStringBuilder (cached_sb);
1069 current_status.BlockedTasks.Add (cached_sb.ToString ());
1072 current_status.TotalTaskCount = total_executed_task_count;
1073 current_status.StatusString = status_str;
1077 return current_status;
1082 public class SchedulerInformation {
1083 [XmlAttribute]
1084 public int TotalTaskCount = -1;
1086 [XmlAttribute]
1087 public string StatusString;
1089 [XmlArray]
1090 [XmlArrayItem (ElementName="PendingTask", Type=typeof (string))]
1091 public ArrayList PendingTasks = new ArrayList ();
1093 [XmlArray]
1094 [XmlArrayItem (ElementName="FutureTask", Type=typeof (string))]
1095 public ArrayList FutureTasks = new ArrayList ();
1097 [XmlArray]
1098 [XmlArrayItem (ElementName="BlockedTask", Type=typeof (string))]
1099 public ArrayList BlockedTasks = new ArrayList ();
1101 private static StringBuilder sb = new StringBuilder ();
1103 public string ToHumanReadableString ()
1105 sb.Length = 0;
1107 sb.Append ("Scheduler:\n");
1108 sb.Append ("Count: ").Append (TotalTaskCount);
1109 sb.Append ('\n');
1111 if (StatusString != null)
1112 sb.Append ("Status: ").Append (StatusString).Append ('\n');
1114 int pos = 1;
1115 sb.Append ("\nPending Tasks:\n");
1116 if (PendingTasks != null && PendingTasks.Count > 0) {
1117 foreach (string task in PendingTasks) {
1118 sb.Append (pos).Append (' ').Append (task).Append ('\n');
1119 ++pos;
1121 } else
1122 sb.Append ("Scheduler queue is empty.\n");
1125 if (FutureTasks != null && FutureTasks.Count > 0) {
1126 sb.Append ("\nFuture Tasks:\n");
1127 foreach (string task in FutureTasks)
1128 sb.Append (task).Append ('\n');
1131 if (BlockedTasks != null && BlockedTasks.Count > 0) {
1132 sb.Append ("\nBlocked Tasks:\n");
1133 foreach (string task in BlockedTasks)
1134 sb.Append (task).Append ('\n');
1137 return sb.ToString ();
1141 #if false
1142 class TestTask : Scheduler.Task {
1144 private class TestCollector : Scheduler.ITaskCollector {
1146 public double GetMinimumWeight ()
1148 return 0;
1151 public double GetMaximumWeight ()
1153 return 5;
1156 public void PreTaskHook ()
1158 Console.WriteLine ("+++ Pre-Task Hook");
1161 public void PostTaskHook ()
1163 Console.WriteLine ("+++ Post-Task Hook");
1167 protected override void DoTaskReal ()
1169 Console.WriteLine ("Doing task '{0}' at {1}", Tag, DateTime.Now);
1170 Thread.Sleep (200);
1171 if (Tag == "Bar")
1172 Reschedule = true;
1175 private static void BeginTaskGroup ()
1177 Console.WriteLine ("--- Begin Task Group!");
1180 private static void EndTaskGroup ()
1182 Console.WriteLine ("--- End Task Group!");
1185 public static void Main ()
1187 Scheduler sched = Scheduler.Global;
1189 Scheduler.TaskGroup tg = Scheduler.NewTaskGroup ("foo",
1190 new Scheduler.Hook (BeginTaskGroup),
1191 new Scheduler.Hook (EndTaskGroup));
1193 sched.Start ();
1195 Scheduler.Task task;
1197 task = new TestTask ();
1198 task.Tag = "Foo";
1199 task.AddTaskGroup (tg);
1200 task.Priority = Scheduler.Priority.Delayed;
1201 task.TriggerTime = DateTime.Now.AddSeconds (7);
1202 sched.Add (task);
1204 task = new TestTask ();
1205 task.Tag = "Bar";
1206 task.AddTaskGroup (tg);
1207 task.Priority = Scheduler.Priority.Delayed;
1208 sched.Add (task);
1210 Scheduler.ITaskCollector collector = null;
1211 for (int i = 0; i < 20; ++i) {
1212 if ((i % 10) == 0)
1213 collector = new TestCollector ();
1214 task = new TestTask ();
1215 task.Tag = String.Format ("Baboon {0}", i);
1216 task.Collector = collector;
1217 task.Priority = Scheduler.Priority.Delayed;
1218 sched.Add (task);
1221 while (true) {
1222 Thread.Sleep (1000);
1226 #endif