* Filters/FilterPackage.cs, Filters/FilterRPM.cs,
[beagle.git] / Util / Scheduler.cs
blob3a7f2b79250591e1c18e25c0a23f4f3cffc35fc5
1 //
2 // Scheduler.cs
3 //
4 // Copyright (C) 2004-2005 Novell, Inc.
5 //
7 //
8 // Permission is hereby granted, free of charge, to any person obtaining a
9 // copy of this software and associated documentation files (the "Software"),
10 // to deal in the Software without restriction, including without limitation
11 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
12 // and/or sell copies of the Software, and to permit persons to whom the
13 // Software is furnished to do so, subject to the following conditions:
15 // The above copyright notice and this permission notice shall be included in
16 // all copies or substantial portions of the Software.
18 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 // DEALINGS IN THE SOFTWARE.
27 using System;
28 using System.Collections;
29 using System.Text;
30 using System.Threading;
32 namespace Beagle.Util {
34 public class Scheduler {
36 // Fire an event if there are no tasks left to execute.
37 public delegate void EmptyQueueDelegate ();
38 public event EmptyQueueDelegate EmptyQueueEvent;
40 static public bool Debug = false;
42 public enum Priority {
44 Shutdown = 0, // Do it on shutdown
46 Idle = 1, // Execute only when the whole machine is idle
47 // Probably should be reserved for computationally-expensive stuff
48 // FIXME: These are not properly scheduled right now
50 Maintenance = 2, // Only execute when there are no lower-priority
51 // tasks from the same source to execute instead
53 Delayed = 3, // Do it soon
55 Immediate = 4, // Do it right now
58 public delegate void Hook ();
59 public delegate void TaskHook (Task task);
61 //////////////////////////////////////////////////////////////////////////////
63 public abstract class Task : IComparable {
65 private string tag = null;
66 private Priority priority = Priority.Delayed;
67 private int sub_priority = 0;
68 private DateTime trigger_time = DateTime.MinValue;
69 private DateTime timestamp; // when added to the scheduler
71 // Some metadata
72 public string Creator;
73 public string Description;
75 public object Source = null; // this is just an opaque identifier
77 public ITaskCollector Collector = null;
78 public double Weight = 1.0;
80 public bool Reschedule = false;
82 private ArrayList task_groups = null;
83 private TaskGroupPrivate child_task_group = null;
85 ///////////////////////////////
88 // The tag is the task's unique identifier
89 public string Tag {
91 get { return tag; }
93 set {
94 // Don't allow us to change the tag of a scheduled task
95 if (tag == null || scheduler == null)
96 tag = value;
97 else
98 throw new Exception ("Can't change tag of " + tag + "!");
102 public Priority Priority {
104 get { return priority; }
106 set {
107 if (priority != value) {
108 priority = value;
109 Recompute ();
114 public int SubPriority {
116 get { return sub_priority; }
118 set {
119 if (sub_priority != value) {
120 sub_priority = value;
121 Recompute ();
126 public DateTime TriggerTime {
128 get { return trigger_time; }
130 set {
131 if (trigger_time != value) {
132 trigger_time = value;
133 Recompute ();
138 public DateTime Timestamp {
139 get { return timestamp; }
142 ///////////////////////////////
144 public void AddTaskGroup (TaskGroup group)
146 if (task_groups == null)
147 task_groups = new ArrayList ();
148 task_groups.Add (group);
151 private void IncrementAllTaskGroups ()
153 if (task_groups != null) {
154 foreach (TaskGroupPrivate group in task_groups) {
155 if (! group.Finished)
156 group.Increment ();
161 private void DecrementAllTaskGroups ()
163 if (task_groups != null) {
164 foreach (TaskGroupPrivate group in task_groups) {
165 if (! group.Finished)
166 group.Decrement ();
171 private void TouchAllTaskGroups ()
173 if (task_groups != null) {
174 foreach (TaskGroupPrivate group in task_groups) {
175 if (! group.Finished)
176 group.Touch ();
181 ///////////////////////////////
183 private Scheduler scheduler = null;
185 public void Schedule (Scheduler scheduler)
187 // Increment the task groups the first
188 // time a task is scheduled.
189 if (this.scheduler == null)
190 IncrementAllTaskGroups ();
191 this.timestamp = DateTime.Now;
192 this.scheduler = scheduler;
193 this.cancelled = false;
196 private void Recompute ()
198 if (scheduler != null)
199 scheduler.Recompute ();
202 ///////////////////////////////
204 // A blocked task will not execute.
205 public bool Blocked {
206 get {
207 // Block the task if we have unexecuted children
208 return child_task_group != null && ! child_task_group.Finished;
212 ///////////////////////////////
214 private bool cancelled = false;
216 public bool Cancelled {
217 get { return cancelled; }
220 public void Cancel ()
222 if (! cancelled) {
223 DecrementAllTaskGroups ();
224 Cleanup (); // clean up after cancelled tasks
226 cancelled = true;
229 ///////////////////////////////
231 // The Task's count keeps track of how many
232 // times it has been executed.
234 private int count = 0;
236 public int Count {
237 get { return count; }
240 ///////////////////////////////
242 public void SpawnChild (Task child_task)
244 if (child_task_group == null)
245 child_task_group = new TaskGroupPrivate ("Children of " + Tag, null, null);
246 child_task.AddTaskGroup (child_task_group);
247 child_task.Source = this.Source;
248 scheduler.Add (child_task);
251 ///////////////////////////////
253 public void DoTask ()
255 if (! cancelled) {
256 if (Debug)
257 Logger.Log.Debug ("Starting task {0}", Tag);
258 child_task_group = null;
259 Reschedule = false;
260 TouchAllTaskGroups ();
262 Stopwatch sw = new Stopwatch ();
263 sw.Start ();
265 try {
266 DoTaskReal ();
267 } catch (Exception ex) {
268 Logger.Log.Warn ("Caught exception in DoTaskReal");
269 Logger.Log.Warn (" Tag: {0}", Tag);
270 Logger.Log.Warn (" Creator: {0}", Creator);
271 Logger.Log.Warn ("Description: {0}", Description);
272 Logger.Log.Warn (" Priority: {0} ({1})", Priority, SubPriority);
273 Logger.Log.Warn (ex);
275 sw.Stop ();
276 if (Debug)
277 Logger.Log.Debug ("Finished task {0} in {1}", Tag, sw);
279 if (Reschedule) {
280 ++count;
281 scheduler.Add (this); // re-add ourselves
282 } else {
283 DecrementAllTaskGroups ();
284 scheduler = null;
289 protected abstract void DoTaskReal ();
291 ///////////////////////////////
293 // Clean-up is called whenever we know that a task will never
294 // be executed. It is never called on tasks for who DoTaskReal
295 // has been called (except when rescheduled). Cleanup is also
296 // called when a task is cancelled.
298 public void Cleanup ()
300 try {
301 DoCleanup ();
302 } catch (Exception ex) {
303 Logger.Log.Warn ("Caught exception cleaning up task '{0}'", Tag);
304 Logger.Log.Warn (ex);
308 protected virtual void DoCleanup ()
310 // Do nothing by default
313 ///////////////////////////////
315 // Sort from lowest to highest priority
316 // FIXME: This does not define a total ordering
317 // on the set of all tasks, so use it with care.
318 public int CompareTo (object obj)
320 Task other = obj as Task;
321 if (other == null)
322 return 1;
324 Priority this_priority;
325 Priority other_priority;
327 this_priority = this.Priority;
328 other_priority = other.Priority;
330 // To other sources, Maintenance tasks looks like
331 // Delayed tasks.
332 if (this.Source != other.Source) {
333 if (this_priority == Priority.Maintenance)
334 this_priority = Priority.Delayed;
335 if (other_priority == Priority.Maintenance)
336 other_priority = Priority.Delayed;
339 int cmp;
340 cmp = (int)this_priority - (int)other_priority;
341 if (cmp != 0)
342 return cmp;
344 cmp = this.SubPriority - other.SubPriority;
345 if (cmp != 0)
346 return cmp;
348 // Tasks that were added to the scheduler earlier take
349 // precedence over those that were added later.
350 cmp = DateTime.Compare (other.Timestamp, this.Timestamp);
351 if (cmp != 0)
352 return cmp;
354 // Try to break any ties
355 return this.GetHashCode () - other.GetHashCode ();
358 public void AppendToStringBuilder (StringBuilder sb)
360 sb.Append (Priority).Append (' ').Append (SubPriority);
361 sb.Append (" (").Append (Timestamp).Append (")\n");
363 sb.Append (Tag).Append ('\n');
365 double t = (TriggerTime - DateTime.Now).TotalSeconds;
366 if (t > 0) {
367 if (t < 120)
368 sb.AppendFormat ("Hold for {0:0.00} seconds\n", t);
369 else {
370 sb.Append ("Hold until ").Append (TriggerTime);
371 sb.Append ('\n');
375 if (Creator != null)
376 sb.Append ("Creator: ").Append (Creator).Append ('\n');
378 if (Description != null)
379 sb.Append (Description).Append ('\n');
383 private class TaskHookWrapper : Task {
385 TaskHook hook;
387 public TaskHookWrapper (TaskHook hook)
389 this.hook = hook;
392 protected override void DoTaskReal ()
394 if (hook != null)
395 hook (this);
399 public static Task TaskFromHook (TaskHook hook)
401 return new TaskHookWrapper (hook);
404 //////////////////////////////////////////////////////////////////////////////
407 // Task Groups
410 public static TaskGroup NewTaskGroup (string name, Hook pre_hook, Hook post_hook)
412 return new TaskGroupPrivate (name, pre_hook, post_hook);
415 // The TaskGroup we hand back to the user is an interface that
416 // exposes minimal functionality.
417 public interface TaskGroup {
418 string Name { get; }
419 bool Finished { get; }
422 private class TaskGroupPrivate : TaskGroup {
423 private string name;
424 private int task_count = 0;
425 private bool touched = false;
426 private bool finished = false;
427 private Hook pre_hook;
428 private Hook post_hook;
430 public TaskGroupPrivate (string name,
431 Hook pre_hook,
432 Hook post_hook)
434 this.name = name;
435 this.pre_hook = pre_hook;
436 this.post_hook = post_hook;
439 public string Name {
440 get { return name; }
443 public bool Finished {
444 get { return finished; }
447 // Call this when a task is added to the task group.
448 public void Increment ()
450 if (finished)
451 throw new Exception ("Tried to increment a finished TaskGroup");
452 ++task_count;
455 // Call this when we execute a task in the task group.
456 public void Touch ()
458 if (finished)
459 throw new Exception ("Tried to touch a finished TaskGroup");
461 if (! touched) {
462 if (pre_hook != null) {
463 try {
464 pre_hook ();
465 } catch (Exception ex) {
466 Logger.Log.Warn ("Caught exception in pre_hook of task group '{0}'", Name);
467 Logger.Log.Warn (ex);
470 touched = true;
474 // Call this after a task in the task group is complete.
475 public void Decrement ()
477 if (finished)
478 throw new Exception ("Tried to decrement a finished TaskGroup");
480 --task_count;
481 // Only fire our post-hook if the pre-hook fired
482 // (or would have fired, had it been non-null)
483 if (task_count == 0 && touched) {
484 if (post_hook != null) {
485 try {
486 post_hook ();
487 } catch (Exception ex) {
488 Logger.Log.Warn ("Caught exception in post_hook of task group '{0}'", Name);
489 Logger.Log.Warn (ex);
492 finished = true;
497 //////////////////////////////////////////////////////////////////////////////
500 // Task Collector
502 // This is a mechanism for executing tasks in sets, possibly outside of
503 // priority order.
506 public interface ITaskCollector {
508 double GetMaximumWeight ();
510 void PreTaskHook ();
511 void PostTaskHook ();
514 //////////////////////////////////////////////////////////////////////////////
516 static private double global_delay = -1.0;
518 static Scheduler ()
520 string exercise;
521 exercise = Environment.GetEnvironmentVariable ("BEAGLE_EXERCISE_THE_DOG");
523 if (exercise != null) {
524 if (exercise.Length > 2 && exercise [0] == 't')
525 global_delay = Double.Parse (exercise.Substring (1));
526 else
527 global_delay = 0.0;
531 //////////////////////////////////////////////////////////////////////////////
533 static private Scheduler global = new Scheduler ();
535 static public Scheduler Global {
536 get { return global; }
539 //////////////////////////////////////////////////////////////////////////////
541 private object big_lock = new object ();
543 // FIXME: shutdown tasks should probably be ordered by something
544 private Queue shutdown_task_queue = new Queue ();
546 private Hashtable tasks_by_tag = new Hashtable ();
547 private int total_executed_task_count = 0;
549 public void Add (Task task)
551 if (task == null)
552 return;
554 if (task.Source == null)
555 throw new Exception ("Attempting to add Task with no source!");
557 Task old_task = null;
559 lock (big_lock) {
561 // Keep track of when immediate priority tasks are
562 // added so that we can throttle if the scheduler
563 // is being slammed with them.
564 if (task.Priority == Priority.Immediate) {
565 // Shift our times down by one
566 Array.Copy (last_immediate_times, 1, last_immediate_times, 0, 4);
567 last_immediate_times [4] = DateTime.Now;
570 old_task = tasks_by_tag [task.Tag] as Task;
572 task.Schedule (this);
574 // Re-adding the same task is basically a no-op --- we
575 // just update the timestamp and return.
576 if (old_task == task)
577 return;
579 if (Debug) {
580 Logger.Log.Debug ("Adding task");
581 Logger.Log.Debug ("Tag: {0}", task.Tag);
582 if (task.Description != null)
583 Logger.Log.Debug ("Desc: {0}", task.Description);
586 if (task.Priority == Priority.Shutdown)
587 shutdown_task_queue.Enqueue (task);
588 else
589 tasks_by_tag [task.Tag] = task;
591 Monitor.Pulse (big_lock);
594 // If we clobbered another task, call cancel on it.
595 // This happens after we release the lock, since
596 // cancellation could result in a task group post-hook
597 // being run.
598 if (old_task != null)
599 old_task.Cancel ();
602 public Task GetByTag (string tag)
604 lock (big_lock)
605 return tasks_by_tag [tag] as Task;
608 public bool ContainsByTag (string tag)
610 Task task = GetByTag (tag);
611 return task != null && !task.Cancelled;
614 public void Recompute ()
616 lock (big_lock)
617 Monitor.Pulse (big_lock);
620 //////////////////////////////////////////////////////////////////////////////
622 Thread thread = null;
623 public bool running = false;
625 public void Start ()
627 lock (this) {
628 if (thread != null)
629 return;
630 running = true;
631 thread = ExceptionHandlingThread.Start (new ThreadStart (Worker));
635 public void Stop ()
637 lock (big_lock) {
638 if (running) {
639 running = false;
640 Monitor.Pulse (big_lock);
646 // Delay Computations
648 // This code controls how we space out tasks
651 // FIXME: random magic constants
652 const double idle_threshold = 5.314159 * 60; // probably should be longer
653 const double idle_ramp_up_time = 5.271828 * 60; // probably should be longer
654 const double default_delayed_rate_factor = 9.03; // work about 1/10th of the time
655 const double default_idle_rate_factor = 2.097; // work about 1/3rd of the time
656 const double maximum_delay = 20; // never wait for more than 20s
657 const double min_throttled_delay = 1.5; // never wait less than this when throttled
658 const double min_overloaded_delay = 2.2; // never wait less than this when there are many tasks
659 const int task_overload_threshold = 60; // number of tasks to process before delaying
661 DateTime[] last_immediate_times = new DateTime [5];
663 // The return value and duration_of_previous_task are both measured in seconds.
664 private double ComputeDelay (Priority priority_of_next_task,
665 double duration_of_previous_task,
666 int executed_task_count)
668 if (global_delay >= 0.0)
669 return global_delay;
671 double rate_factor;
673 rate_factor = 2.0;
675 // Do everything faster the longer we are idle.
676 double idle_time = SystemInformation.InputIdleTime;
677 double idle_scale = 1.0;
678 bool is_idle = false;
679 bool need_throttle = false;
681 // Never speed up if we are using the battery.
682 if (idle_time > idle_threshold && ! SystemInformation.UsingBattery) {
683 is_idle = true;
684 double t = (idle_time - idle_threshold) / idle_ramp_up_time;
685 idle_scale = (1 - Math.Min (t, 1.0));
688 switch (priority_of_next_task) {
690 case Priority.Immediate:
691 rate_factor = 0;
693 if (last_immediate_times [0] != DateTime.MinValue) {
694 TimeSpan last_add_delta = DateTime.Now.Subtract (last_immediate_times [4]);
696 // If less than a second has gone by since the
697 // last immediate task was added, there is
698 // still a torrent of events coming in, and we
699 // may need to throttle.
700 if (last_add_delta.TotalSeconds <= 1) {
701 TimeSpan between_add_delta = last_immediate_times [4].Subtract (last_immediate_times [0]);
703 // At least 5 immediate tasks have been
704 // added in the last second. We
705 // definitely need to throttle.
706 if (between_add_delta.TotalSeconds <= 1) {
707 need_throttle = true;
708 rate_factor = idle_scale * default_idle_rate_factor;
713 // If we've processed many tasks since the last
714 // time we took a break, ignore the priority and set a
715 // delay equivalent to Priority.Delayed.
716 if (!is_idle && executed_task_count >= task_overload_threshold)
717 rate_factor = idle_scale * default_delayed_rate_factor;
719 break;
721 case Priority.Delayed:
722 rate_factor = idle_scale * default_delayed_rate_factor;
723 break;
725 case Priority.Idle:
726 rate_factor = idle_scale * default_idle_rate_factor;
727 break;
731 // FIXME: we should do something more sophisticated than this
732 // with the load average.
733 // Random numbers galore!
734 double load_average = SystemInformation.LoadAverageOneMinute;
735 if (load_average > 3.001)
736 rate_factor *= 5.002;
737 else if (load_average > 1.5003)
738 rate_factor *= 2.004;
740 double delay = rate_factor * duration_of_previous_task;
742 // space out delayed tasks a bit when we aren't idle
743 if (! is_idle
744 && priority_of_next_task == Priority.Delayed
745 && delay < 0.5)
746 delay = 0.5;
748 if (delay > maximum_delay)
749 delay = maximum_delay;
751 // If we need to throttle, make sure we don't delay less than
752 // a second and some.
753 if (need_throttle && delay < min_throttled_delay)
754 delay = min_throttled_delay;
756 // If we're not idle and we've just processed more
757 // than a certain number of events, take a break.
758 if (! is_idle
759 && executed_task_count >= task_overload_threshold
760 && delay < min_overloaded_delay)
761 delay = min_overloaded_delay;
763 return delay;
767 // The main loop
770 // A convenience function. There should be a
771 // constructor to TimeSpan that does this.
772 private static TimeSpan TimeSpanFromSeconds (double t)
774 // Wait barfs if you hand it a negative TimeSpan,
775 // so we are paranoid;
776 if (t < 0.001)
777 t = 0;
779 // 1 tick = 100 nanoseconds
780 long ticks = (long) (t * 1.0e+7);
781 return new TimeSpan (ticks);
784 private string status_str = null;
786 private void Worker ()
788 DateTime end_time_of_previous_task = DateTime.MinValue;
789 double duration_of_previous_task = 0.0;
791 Hook pre_hook = null;
792 Hook post_hook = null;
793 ArrayList to_be_executed = new ArrayList ();
794 Hashtable max_priority_by_source = new Hashtable ();
795 int executed_task_count = 0;
796 StringBuilder status_builder = new StringBuilder ();
798 while (running) {
800 status_str = "Finding next task to execute";
802 lock (big_lock) {
804 // If there are no pending tasks, wait
805 // on our lock and then re-start our
806 // while loop
807 if (tasks_by_tag.Count == 0) {
808 if (EmptyQueueEvent != null)
809 EmptyQueueEvent ();
810 status_str = "Waiting on empty queue";
811 Monitor.Wait (big_lock);
812 executed_task_count = 0;
813 continue;
816 // Walk across our list of tasks and find
817 // the next one to execute.
818 DateTime now = DateTime.Now;
819 DateTime next_trigger_time = DateTime.MaxValue;
821 // Make a first pass over our tasks, finding the
822 // highest-priority item per source.
823 max_priority_by_source.Clear ();
824 foreach (Task task in tasks_by_tag.Values) {
825 if (task.Blocked || task.TriggerTime >= now)
826 continue;
827 if (max_priority_by_source.Contains (task.Source)) {
828 Priority p = (Priority) max_priority_by_source [task.Source];
829 if (p < task.Priority)
830 max_priority_by_source [task.Source] = task.Priority;
831 } else {
832 max_priority_by_source [task.Source] = task.Priority;
836 // Now make a second pass over the tasks and find
837 // the highest-priority item. We use the information
838 // from the first pass to correctly prioritize maintenance tasks.
839 Task next_task = null;
840 foreach (Task task in tasks_by_tag.Values) {
841 if (task.Blocked)
842 continue;
843 if (task.TriggerTime >= now) {
844 if (task.TriggerTime < next_trigger_time)
845 next_trigger_time = task.TriggerTime;
846 continue;
849 // If this is a maintenance task and there is a high-priority
850 // task from the same source, skip it.
851 if (task.Priority == Priority.Maintenance) {
852 Priority p = (Priority) max_priority_by_source [task.Source];
853 if (p > task.Priority)
854 continue;
857 if (task.TriggerTime < now) {
858 if (next_task == null || next_task.CompareTo (task) < 0)
859 next_task = task;
863 // If we didn't find a task, wait for the next trigger-time
864 // and then re-start our while loop.
865 if (next_task == null) {
866 if (next_trigger_time == DateTime.MaxValue) {
867 status_str = "Waiting for an unblocked task";
868 Monitor.Wait (big_lock);
869 } else {
870 status_str = "Waiting for the next trigger time";
871 Monitor.Wait (big_lock, next_trigger_time - now);
873 executed_task_count = 0;
874 continue;
877 // If we did find a task, do we want to execute it right now?
878 // Or should we wait a bit?
880 // How should we space things out?
881 double delay = 0;
882 delay = ComputeDelay (next_task.Priority, duration_of_previous_task, executed_task_count);
883 delay = Math.Min (delay, (next_trigger_time - now).TotalSeconds);
885 // Adjust by the time that has actually elapsed since the
886 // last task.
887 delay -= (now - end_time_of_previous_task).TotalSeconds;
889 // If we still need to wait a bit longer, wait for the appropriate
890 // amount of time and then re-start our while loop.
891 if (delay > 0.001) {
892 status_str = "Waiting for next task.";
893 Monitor.Wait (big_lock, TimeSpanFromSeconds (delay));
894 executed_task_count = 0;
895 continue;
899 // If we've made it to this point, it is time to start
900 // executing our selected task.
903 to_be_executed.Clear ();
905 if (next_task.Collector == null) {
907 to_be_executed.Add (next_task);
909 } else {
911 pre_hook = new Hook (next_task.Collector.PreTaskHook);
912 post_hook = new Hook (next_task.Collector.PostTaskHook);
914 // Find all eligible tasks with the same collector,
915 // and add them to the collection list.
916 now = DateTime.Now;
917 foreach (Task task in tasks_by_tag.Values)
918 if (task != next_task
919 && task.Collector == next_task.Collector
920 && !task.Blocked
921 && task.TriggerTime < now)
922 to_be_executed.Add (task);
924 // Order the tasks from highest to lowest priority.
925 // Our original task will always be the first item
926 // in the resulting array.
927 to_be_executed.Sort ();
928 to_be_executed.Add (next_task);
929 to_be_executed.Reverse ();
931 // Now find how many tasks can be executed before we
932 // exceed the collector's maximum weight. If necessary,
933 // prune the list of tasks.
934 double remaining_weight;
935 remaining_weight = next_task.Collector.GetMaximumWeight ();
936 int i = 0;
937 while (i < to_be_executed.Count && remaining_weight > 0) {
938 Task task;
939 task = to_be_executed [i] as Task;
940 remaining_weight -= task.Weight;
941 ++i;
943 if (i < to_be_executed.Count)
944 to_be_executed.RemoveRange (i, to_be_executed.Count - i);
947 // Remove the tasks we are about to execute from our
948 // master list.
949 foreach (Task task in to_be_executed)
950 tasks_by_tag.Remove (task.Tag);
952 // Pulse our lock, in case anyone is waiting for it.
953 Monitor.Pulse (big_lock);
956 // Now actually execute the set of tasks we found.
958 status_builder.Length = 0;
959 status_builder.Append ("Executing task");
960 if (to_be_executed.Count > 1)
961 status_builder.Append ('s');
962 status_builder.Append ('\n');
963 foreach (Task task in to_be_executed) {
964 task.AppendToStringBuilder (status_builder);
965 status_builder.Append ('\n');
967 status_str = status_builder.ToString ();
969 DateTime start_time = DateTime.Now;
970 if (pre_hook != null) {
971 try {
972 pre_hook ();
973 } catch (Exception ex) {
974 Logger.Log.Error ("Caught exception in pre_hook '{0}'", pre_hook);
975 Logger.Log.Error (ex);
978 foreach (Task task in to_be_executed) {
979 task.DoTask ();
980 ++total_executed_task_count;
981 ++executed_task_count;
983 if (post_hook != null) {
984 try {
985 post_hook ();
986 } catch (Exception ex) {
987 Logger.Log.Error ("Caught exception in post_hook '{0}'", post_hook);
988 Logger.Log.Error (ex);
992 end_time_of_previous_task = DateTime.Now;
993 duration_of_previous_task = (end_time_of_previous_task - start_time).TotalSeconds;
996 // Execute all shutdown tasks
997 foreach (Task task in shutdown_task_queue)
998 if (! task.Cancelled && ! task.Blocked)
999 task.DoTask ();
1001 // Call Cleanup on all of our unexecuted tasks
1002 foreach (Task task in tasks_by_tag.Values)
1003 task.Cleanup ();
1005 if (Debug)
1006 Logger.Log.Debug ("Scheduler.Worker finished");
1009 //////////////////////////////////////////////////////////////////////////////
1011 public string GetHumanReadableStatus ()
1013 StringBuilder sb = new StringBuilder ();
1015 lock (big_lock) {
1017 ArrayList blocked_tasks = new ArrayList ();
1018 ArrayList future_tasks = new ArrayList ();
1019 ArrayList pending_tasks = new ArrayList ();
1021 DateTime now = DateTime.Now;
1022 foreach (Task task in tasks_by_tag.Values) {
1023 if (task.Blocked)
1024 blocked_tasks.Add (task);
1025 else if (task.TriggerTime > now)
1026 future_tasks.Add (task);
1027 else
1028 pending_tasks.Add (task);
1031 blocked_tasks.Sort ();
1032 blocked_tasks.Reverse ();
1034 future_tasks.Sort ();
1035 future_tasks.Reverse ();
1037 pending_tasks.Sort ();
1038 pending_tasks.Reverse ();
1040 sb.Append ("Scheduler:\n");
1041 sb.Append ("Count: ").Append (total_executed_task_count);
1042 sb.Append ('\n');
1044 if (status_str != null)
1045 sb.Append ("Status: ").Append (status_str).Append ('\n');
1047 int pos = 1;
1048 sb.Append ("\nPending Tasks:\n");
1049 foreach (Task task in pending_tasks) {
1050 sb.Append (pos).Append (' ');
1051 task.AppendToStringBuilder (sb);
1052 sb.Append ('\n');
1053 ++pos;
1055 if (pos == 1)
1056 sb.Append ("Scheduler queue is empty.\n");
1059 if (future_tasks.Count > 0) {
1060 sb.Append ("\nFuture Tasks:\n");
1061 foreach (Task task in future_tasks) {
1062 task.AppendToStringBuilder (sb);
1063 sb.Append ('\n');
1067 if (blocked_tasks.Count > 0) {
1068 sb.Append ("\nBlocked Tasks:\n");
1069 foreach (Task task in blocked_tasks) {
1070 task.AppendToStringBuilder (sb);
1071 sb.Append ('\n');
1076 sb.Append ('\n');
1077 return sb.ToString ();
1081 #if false
1082 class TestTask : Scheduler.Task {
1084 private class TestCollector : Scheduler.ITaskCollector {
1086 public double GetMinimumWeight ()
1088 return 0;
1091 public double GetMaximumWeight ()
1093 return 5;
1096 public void PreTaskHook ()
1098 Console.WriteLine ("+++ Pre-Task Hook");
1101 public void PostTaskHook ()
1103 Console.WriteLine ("+++ Post-Task Hook");
1107 protected override void DoTaskReal ()
1109 Console.WriteLine ("Doing task '{0}' at {1}", Tag, DateTime.Now);
1110 Thread.Sleep (200);
1111 if (Tag == "Bar")
1112 Reschedule = true;
1115 static void BeginTaskGroup ()
1117 Console.WriteLine ("--- Begin Task Group!");
1120 static void EndTaskGroup ()
1122 Console.WriteLine ("--- End Task Group!");
1125 static void Main ()
1127 Scheduler sched = Scheduler.Global;
1129 Scheduler.TaskGroup tg = Scheduler.NewTaskGroup ("foo",
1130 new Scheduler.Hook (BeginTaskGroup),
1131 new Scheduler.Hook (EndTaskGroup));
1133 sched.Start ();
1135 Scheduler.Task task;
1137 task = new TestTask ();
1138 task.Tag = "Foo";
1139 task.AddTaskGroup (tg);
1140 task.Priority = Scheduler.Priority.Delayed;
1141 task.TriggerTime = DateTime.Now.AddSeconds (7);
1142 sched.Add (task);
1144 task = new TestTask ();
1145 task.Tag = "Bar";
1146 task.AddTaskGroup (tg);
1147 task.Priority = Scheduler.Priority.Delayed;
1148 sched.Add (task);
1150 Scheduler.ITaskCollector collector = null;
1151 for (int i = 0; i < 20; ++i) {
1152 if ((i % 10) == 0)
1153 collector = new TestCollector ();
1154 task = new TestTask ();
1155 task.Tag = String.Format ("Baboon {0}", i);
1156 task.Collector = collector;
1157 task.Priority = Scheduler.Priority.Delayed;
1158 sched.Add (task);
1161 while (true) {
1162 Thread.Sleep (1000);
1166 #endif