Some more fixes wrt child-indexables. Namely, fix proper handling of child indexables...
[beagle.git] / Util / Scheduler.cs
blobc7c65ac36084cc59c53297aea8d7d66b7d91e3db
1 //
2 // Scheduler.cs
3 //
4 // Copyright (C) 2004-2005 Novell, Inc.
5 //
7 //
8 // Permission is hereby granted, free of charge, to any person obtaining a
9 // copy of this software and associated documentation files (the "Software"),
10 // to deal in the Software without restriction, including without limitation
11 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
12 // and/or sell copies of the Software, and to permit persons to whom the
13 // Software is furnished to do so, subject to the following conditions:
15 // The above copyright notice and this permission notice shall be included in
16 // all copies or substantial portions of the Software.
18 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 // DEALINGS IN THE SOFTWARE.
27 using System;
28 using System.Collections;
29 using System.Text;
30 using System.Threading;
31 using System.Xml;
32 using System.Xml.Serialization;
34 namespace Beagle.Util {
36 public class Scheduler {
38 // Fire an event if there are no tasks left to execute.
39 public delegate void EmptyQueueDelegate ();
40 public event EmptyQueueDelegate EmptyQueueEvent;
42 public static bool Debug = false;
44 public enum Priority {
46 Shutdown = 0, // Do it on shutdown
48 Idle = 1, // Execute only when the whole machine is idle
49 // Probably should be reserved for computationally-expensive stuff
50 // FIXME: These are not properly scheduled right now
52 Maintenance = 2, // Only execute when there are no lower-priority
53 // tasks from the same source to execute instead
55 Delayed = 3, // Do it soon
57 Immediate = 4, // Do it right now
60 public delegate void Hook ();
61 public delegate void TaskHook (Task task);
63 //////////////////////////////////////////////////////////////////////////////
65 public abstract class Task : IComparable {
67 private string tag = null;
68 private Priority priority = Priority.Delayed;
69 private int sub_priority = 0;
70 private DateTime trigger_time = DateTime.MinValue;
71 private DateTime timestamp; // when added to the scheduler
73 // Some metadata
74 public string Creator;
75 public string Description;
77 public object Source = null; // this is just an opaque identifier
79 public ITaskCollector Collector = null;
80 public double Weight = 1.0;
82 public bool Reschedule = false;
84 private ArrayList task_groups = null;
85 private TaskGroupPrivate child_task_group = null;
87 ///////////////////////////////
90 // The tag is the task's unique identifier
91 public string Tag {
93 get { return tag; }
95 set {
96 // Don't allow us to change the tag of a scheduled task
97 if (tag == null || scheduler == null)
98 tag = value;
99 else
100 throw new Exception ("Can't change tag of " + tag + "!");
104 public Priority Priority {
106 get { return priority; }
108 set {
109 if (priority != value) {
110 priority = value;
111 Recompute ();
116 public int SubPriority {
118 get { return sub_priority; }
120 set {
121 if (sub_priority != value) {
122 sub_priority = value;
123 Recompute ();
128 public DateTime TriggerTime {
130 get { return trigger_time; }
132 set {
133 if (trigger_time != value) {
134 trigger_time = value;
135 Recompute ();
140 public DateTime Timestamp {
141 get { return timestamp; }
144 ///////////////////////////////
146 public void AddTaskGroup (TaskGroup group)
148 if (task_groups == null)
149 task_groups = new ArrayList ();
150 task_groups.Add (group);
153 private void IncrementAllTaskGroups ()
155 if (task_groups != null) {
156 foreach (TaskGroupPrivate group in task_groups) {
157 if (! group.Finished)
158 group.Increment ();
163 private void DecrementAllTaskGroups ()
165 if (task_groups != null) {
166 foreach (TaskGroupPrivate group in task_groups) {
167 if (! group.Finished)
168 group.Decrement ();
173 private void TouchAllTaskGroups ()
175 if (task_groups != null) {
176 foreach (TaskGroupPrivate group in task_groups) {
177 if (! group.Finished)
178 group.Touch ();
183 ///////////////////////////////
185 private Scheduler scheduler = null;
187 public void Schedule (Scheduler scheduler)
189 // Increment the task groups the first
190 // time a task is scheduled.
191 if (this.scheduler == null)
192 IncrementAllTaskGroups ();
193 this.timestamp = DateTime.Now;
194 this.scheduler = scheduler;
195 this.cancelled = false;
198 private void Recompute ()
200 if (scheduler != null)
201 scheduler.Recompute ();
204 ///////////////////////////////
206 // A blocked task will not execute.
207 public bool Blocked {
208 get {
209 // Block the task if we have unexecuted children
210 return child_task_group != null && ! child_task_group.Finished;
214 ///////////////////////////////
216 private bool cancelled = false;
218 public bool Cancelled {
219 get { return cancelled; }
222 public void Cancel ()
224 if (! cancelled) {
225 DecrementAllTaskGroups ();
226 Cleanup (); // clean up after cancelled tasks
228 cancelled = true;
231 ///////////////////////////////
233 // The Task's count keeps track of how many
234 // times it has been executed.
236 private int count = 0;
238 public int Count {
239 get { return count; }
242 ///////////////////////////////
244 public void SpawnChild (Task child_task)
246 if (child_task_group == null)
247 child_task_group = new TaskGroupPrivate ("Children of " + Tag, null, null);
248 child_task.AddTaskGroup (child_task_group);
249 child_task.Source = this.Source;
250 scheduler.Add (child_task);
253 ///////////////////////////////
255 public void DoTask ()
257 if (! cancelled) {
258 if (Debug)
259 Logger.Log.Debug ("Starting task {0}", Tag);
260 child_task_group = null;
261 Reschedule = false;
262 TouchAllTaskGroups ();
264 Stopwatch sw = new Stopwatch ();
265 sw.Start ();
267 try {
268 DoTaskReal ();
269 } catch (Exception ex) {
270 Logger.Log.Warn (ex,
271 "Caught exception in DoTaskReal\n" +
272 " Tag: {0}\n" +
273 " Creator: {0}\n" +
274 "Description: {0}\n" +
275 " Priority: {0} ({1})",
276 Tag, Creator, Description, Priority, SubPriority);
278 sw.Stop ();
279 if (Debug)
280 Logger.Log.Debug ("Finished task {0} in {1}", Tag, sw);
282 if (Reschedule) {
283 ++count;
284 if (Debug)
285 Log.Debug ("Rescheduling task {0}", Tag);
286 scheduler.Add (this); // re-add ourselves
287 } else {
288 DecrementAllTaskGroups ();
289 scheduler = null;
294 protected abstract void DoTaskReal ();
296 ///////////////////////////////
298 // Clean-up is called whenever we know that a task will never
299 // be executed. It is never called on tasks for who DoTaskReal
300 // has been called (except when rescheduled). Cleanup is also
301 // called when a task is cancelled.
303 public void Cleanup ()
305 try {
306 DoCleanup ();
307 } catch (Exception ex) {
308 Logger.Log.Warn (ex, "Caught exception cleaning up task '{0}'", Tag);
312 protected virtual void DoCleanup ()
314 // Do nothing by default
317 ///////////////////////////////
319 // Sort from lowest to highest priority
320 // FIXME: This does not define a total ordering
321 // on the set of all tasks, so use it with care.
322 public int CompareTo (object obj)
324 Task other = obj as Task;
325 if (other == null)
326 return 1;
328 Priority this_priority;
329 Priority other_priority;
331 this_priority = this.Priority;
332 other_priority = other.Priority;
334 // To other sources, Maintenance tasks looks like
335 // Delayed tasks.
336 if (this.Source != other.Source) {
337 if (this_priority == Priority.Maintenance)
338 this_priority = Priority.Delayed;
339 if (other_priority == Priority.Maintenance)
340 other_priority = Priority.Delayed;
343 int cmp;
344 cmp = (int)this_priority - (int)other_priority;
345 if (cmp != 0)
346 return cmp;
348 cmp = this.SubPriority - other.SubPriority;
349 if (cmp != 0)
350 return cmp;
352 // Tasks that were added to the scheduler earlier take
353 // precedence over those that were added later.
354 cmp = DateTime.Compare (other.Timestamp, this.Timestamp);
355 if (cmp != 0)
356 return cmp;
358 // Try to break any ties
359 return this.GetHashCode () - other.GetHashCode ();
362 public void AppendToStringBuilder (StringBuilder sb)
364 sb.Append (Priority).Append (' ').Append (SubPriority);
365 sb.Append (" (").Append (Timestamp).Append (")\n");
367 sb.Append (Tag).Append ('\n');
369 double t = (TriggerTime - DateTime.Now).TotalSeconds;
370 if (t > 0) {
371 if (t < 120)
372 sb.AppendFormat ("Hold for {0:0.00} seconds\n", t);
373 else {
374 sb.Append ("Hold until ").Append (TriggerTime);
375 sb.Append ('\n');
379 if (Creator != null)
380 sb.Append ("Creator: ").Append (Creator).Append ('\n');
382 if (Description != null)
383 sb.Append (Description).Append ('\n');
387 private class TaskHookWrapper : Task {
389 TaskHook hook;
391 public TaskHookWrapper (TaskHook hook)
393 this.hook = hook;
396 protected override void DoTaskReal ()
398 if (hook != null)
399 hook (this);
403 public static Task TaskFromHook (TaskHook hook)
405 return new TaskHookWrapper (hook);
408 //////////////////////////////////////////////////////////////////////////////
411 // Task Groups
414 public static TaskGroup NewTaskGroup (string name, Hook pre_hook, Hook post_hook)
416 return new TaskGroupPrivate (name, pre_hook, post_hook);
419 // The TaskGroup we hand back to the user is an interface that
420 // exposes minimal functionality.
421 public interface TaskGroup {
422 string Name { get; }
423 bool Finished { get; }
426 private class TaskGroupPrivate : TaskGroup {
427 private string name;
428 private int task_count = 0;
429 private bool touched = false;
430 private bool finished = false;
431 private Hook pre_hook;
432 private Hook post_hook;
434 public TaskGroupPrivate (string name,
435 Hook pre_hook,
436 Hook post_hook)
438 this.name = name;
439 this.pre_hook = pre_hook;
440 this.post_hook = post_hook;
443 public string Name {
444 get { return name; }
447 public bool Finished {
448 get { return finished; }
451 // Call this when a task is added to the task group.
452 public void Increment ()
454 if (finished)
455 throw new Exception ("Tried to increment a finished TaskGroup");
456 ++task_count;
459 // Call this when we execute a task in the task group.
460 public void Touch ()
462 if (finished)
463 throw new Exception ("Tried to touch a finished TaskGroup");
465 if (! touched) {
466 if (pre_hook != null) {
467 try {
468 pre_hook ();
469 } catch (Exception ex) {
470 Logger.Log.Warn (ex, "Caught exception in pre_hook of task group '{0}'", Name);
473 touched = true;
477 // Call this after a task in the task group is complete.
478 public void Decrement ()
480 if (finished)
481 throw new Exception ("Tried to decrement a finished TaskGroup");
483 --task_count;
484 // Only fire our post-hook if the pre-hook fired
485 // (or would have fired, had it been non-null)
486 if (task_count == 0 && touched) {
487 if (post_hook != null) {
488 try {
489 post_hook ();
490 } catch (Exception ex) {
491 Logger.Log.Warn (ex, "Caught exception in post_hook of task group '{0}'", Name);
494 finished = true;
499 //////////////////////////////////////////////////////////////////////////////
502 // Task Collector
504 // This is a mechanism for executing tasks in sets, possibly outside of
505 // priority order.
508 public interface ITaskCollector {
510 double GetMaximumWeight ();
512 void PreTaskHook ();
513 void PostTaskHook ();
516 //////////////////////////////////////////////////////////////////////////////
518 private static double global_delay = -1.0;
520 static Scheduler ()
522 string exercise;
523 exercise = Environment.GetEnvironmentVariable ("BEAGLE_EXERCISE_THE_DOG");
525 if (exercise != null) {
526 if (exercise.Length > 2 && exercise [0] == 't')
527 global_delay = Double.Parse (exercise.Substring (1));
528 else
529 global_delay = 0.0;
533 //////////////////////////////////////////////////////////////////////////////
535 private static Scheduler global = new Scheduler ();
537 public static Scheduler Global {
538 get { return global; }
541 //////////////////////////////////////////////////////////////////////////////
543 private object big_lock = new object ();
545 // FIXME: shutdown tasks should probably be ordered by something
546 private Queue shutdown_task_queue = new Queue ();
548 private Hashtable tasks_by_tag = new Hashtable ();
549 private int total_executed_task_count = 0;
551 public void Add (Task task)
553 if (task == null)
554 return;
556 if (task.Source == null)
557 throw new Exception ("Attempting to add Task with no source!");
559 Task old_task = null;
561 lock (big_lock) {
563 // Keep track of when immediate priority tasks are
564 // added so that we can throttle if the scheduler
565 // is being slammed with them.
566 if (task.Priority == Priority.Immediate) {
567 // Shift our times down by one
568 Array.Copy (last_immediate_times, 1, last_immediate_times, 0, 4);
569 last_immediate_times [4] = DateTime.Now;
572 old_task = tasks_by_tag [task.Tag] as Task;
574 task.Schedule (this);
576 // Re-adding the same task is basically a no-op --- we
577 // just update the timestamp and return.
578 if (old_task == task)
579 return;
581 if (Debug) {
582 Logger.Log.Debug ("Adding task {0}", task.Tag);
583 if (task.Description != null)
584 Logger.Log.Debug (" Desc: {0}", task.Description);
587 if (task.Priority == Priority.Shutdown)
588 shutdown_task_queue.Enqueue (task);
589 else
590 tasks_by_tag [task.Tag] = task;
592 Monitor.Pulse (big_lock);
595 // If we clobbered another task, call cancel on it.
596 // This happens after we release the lock, since
597 // cancellation could result in a task group post-hook
598 // being run.
599 if (old_task != null)
600 old_task.Cancel ();
603 public Task GetByTag (string tag)
605 lock (big_lock)
606 return tasks_by_tag [tag] as Task;
609 public bool ContainsByTag (string tag)
611 Task task = GetByTag (tag);
612 return task != null && !task.Cancelled;
615 public void Recompute ()
617 lock (big_lock)
618 Monitor.Pulse (big_lock);
621 //////////////////////////////////////////////////////////////////////////////
623 private Thread thread = null;
624 public bool running = false;
625 private static bool shutdown_requested = false;
627 public void Start ()
629 lock (this) {
630 if (shutdown_requested || thread != null)
631 return;
632 running = true;
633 thread = ExceptionHandlingThread.Start (new ThreadStart (Worker));
637 public void Stop (bool to_shutdown)
639 lock (big_lock) {
640 shutdown_requested = to_shutdown;
642 if (running) {
643 running = false;
644 thread = null;
645 status_str = "Stopped";
646 Monitor.Pulse (big_lock);
651 public void Stop ()
653 Stop (false);
657 // Delay Computations
659 // This code controls how we space out tasks
662 // FIXME: random magic constants
663 const double idle_threshold = 5.314159 * 60; // probably should be longer
664 const double idle_ramp_up_time = 5.271828 * 60; // probably should be longer
665 const double default_delayed_rate_factor = 9.03; // work about 1/10th of the time
666 const double default_idle_rate_factor = 2.097; // work about 1/3rd of the time
667 const double maximum_delay = 20; // never wait for more than 20s
668 const double min_throttled_delay = 1.5; // never wait less than this when throttled
669 const double min_overloaded_delay = 2.2; // never wait less than this when there are many tasks
670 const int task_overload_threshold = 60; // number of tasks to process before delaying
672 DateTime[] last_immediate_times = new DateTime [5];
674 // The return value and duration_of_previous_task are both measured in seconds.
675 private double ComputeDelay (Priority priority_of_next_task,
676 double duration_of_previous_task,
677 int executed_task_count)
679 if (global_delay >= 0.0)
680 return global_delay;
682 double rate_factor;
684 rate_factor = 2.0;
686 // Do everything faster the longer we are idle.
687 double idle_time = SystemInformation.InputIdleTime;
688 double idle_scale = 1.0;
689 bool is_idle = false;
690 bool need_throttle = false;
692 // Never speed up if we are using the battery.
693 if (idle_time > idle_threshold && ! SystemInformation.UsingBattery) {
694 is_idle = true;
695 double t = (idle_time - idle_threshold) / idle_ramp_up_time;
696 idle_scale = (1 - Math.Min (t, 1.0));
699 switch (priority_of_next_task) {
701 case Priority.Immediate:
702 rate_factor = 0;
704 if (last_immediate_times [0] != DateTime.MinValue) {
705 TimeSpan last_add_delta = DateTime.Now.Subtract (last_immediate_times [4]);
707 // If less than a second has gone by since the
708 // last immediate task was added, there is
709 // still a torrent of events coming in, and we
710 // may need to throttle.
711 if (last_add_delta.TotalSeconds <= 1) {
712 TimeSpan between_add_delta = last_immediate_times [4].Subtract (last_immediate_times [0]);
714 // At least 5 immediate tasks have been
715 // added in the last second. We
716 // definitely need to throttle.
717 if (between_add_delta.TotalSeconds <= 1) {
718 need_throttle = true;
719 rate_factor = idle_scale * default_idle_rate_factor;
724 // If we've processed many tasks since the last
725 // time we took a break, ignore the priority and set a
726 // delay equivalent to Priority.Delayed.
727 if (!is_idle && executed_task_count >= task_overload_threshold)
728 rate_factor = idle_scale * default_delayed_rate_factor;
730 break;
732 case Priority.Delayed:
733 rate_factor = idle_scale * default_delayed_rate_factor;
734 break;
736 case Priority.Idle:
737 rate_factor = idle_scale * default_idle_rate_factor;
738 break;
742 // FIXME: we should do something more sophisticated than this
743 // with the load average.
744 // Random numbers galore!
745 double load_average = SystemInformation.LoadAverageOneMinute;
746 if (load_average > 3.001)
747 rate_factor *= 5.002;
748 else if (load_average > 1.5003)
749 rate_factor *= 2.004;
751 double delay = rate_factor * duration_of_previous_task;
753 // space out delayed tasks a bit when we aren't idle
754 if (! is_idle
755 && priority_of_next_task == Priority.Delayed
756 && delay < 0.5)
757 delay = 0.5;
759 if (delay > maximum_delay)
760 delay = maximum_delay;
762 // If we need to throttle, make sure we don't delay less than
763 // a second and some.
764 if (need_throttle && delay < min_throttled_delay)
765 delay = min_throttled_delay;
767 // If we're not idle and we've just processed more
768 // than a certain number of events, take a break.
769 if (! is_idle
770 && executed_task_count >= task_overload_threshold
771 && delay < min_overloaded_delay)
772 delay = min_overloaded_delay;
774 return delay;
778 // The main loop
781 // A convenience function. There should be a
782 // constructor to TimeSpan that does this.
783 private static TimeSpan TimeSpanFromSeconds (double t)
785 // Wait barfs if you hand it a negative TimeSpan,
786 // so we are paranoid;
787 if (t < 0.001)
788 t = 0;
790 // 1 tick = 100 nanoseconds
791 long ticks = (long) (t * 1.0e+7);
792 return new TimeSpan (ticks);
795 private string status_str = null;
797 private void Worker ()
799 DateTime end_time_of_previous_task = DateTime.MinValue;
800 double duration_of_previous_task = 0.0;
802 Hook pre_hook = null;
803 Hook post_hook = null;
804 ArrayList to_be_executed = new ArrayList ();
805 Hashtable max_priority_by_source = new Hashtable ();
806 int executed_task_count = 0;
807 StringBuilder status_builder = new StringBuilder ();
809 while (running) {
811 status_str = "Finding next task to execute";
813 lock (big_lock) {
815 // If there are no pending tasks, wait
816 // on our lock and then re-start our
817 // while loop
818 if (tasks_by_tag.Count == 0) {
819 if (EmptyQueueEvent != null)
820 EmptyQueueEvent ();
821 status_str = "Waiting on empty queue";
822 Monitor.Wait (big_lock);
823 executed_task_count = 0;
824 continue;
827 // Walk across our list of tasks and find
828 // the next one to execute.
829 DateTime now = DateTime.Now;
830 DateTime next_trigger_time = DateTime.MaxValue;
832 // Make a first pass over our tasks, finding the
833 // highest-priority item per source.
834 max_priority_by_source.Clear ();
835 foreach (Task task in tasks_by_tag.Values) {
836 if (task.Blocked || task.TriggerTime >= now)
837 continue;
838 if (max_priority_by_source.Contains (task.Source)) {
839 Priority p = (Priority) max_priority_by_source [task.Source];
840 if (p < task.Priority)
841 max_priority_by_source [task.Source] = task.Priority;
842 } else {
843 max_priority_by_source [task.Source] = task.Priority;
847 // Now make a second pass over the tasks and find
848 // the highest-priority item. We use the information
849 // from the first pass to correctly prioritize maintenance tasks.
850 Task next_task = null;
851 foreach (Task task in tasks_by_tag.Values) {
852 if (task.Blocked)
853 continue;
854 if (task.TriggerTime >= now) {
855 if (task.TriggerTime < next_trigger_time)
856 next_trigger_time = task.TriggerTime;
857 continue;
860 // If this is a maintenance task and there is a high-priority
861 // task from the same source, skip it.
862 if (task.Priority == Priority.Maintenance) {
863 Priority p = (Priority) max_priority_by_source [task.Source];
864 if (p > task.Priority)
865 continue;
868 if (task.TriggerTime < now) {
869 if (next_task == null || next_task.CompareTo (task) < 0)
870 next_task = task;
874 // If we didn't find a task, wait for the next trigger-time
875 // and then re-start our while loop.
876 if (next_task == null) {
877 if (next_trigger_time == DateTime.MaxValue) {
878 status_str = "Waiting for an unblocked task";
879 Monitor.Wait (big_lock);
880 } else {
881 status_str = "Waiting for the next trigger time";
882 Monitor.Wait (big_lock, next_trigger_time - now);
884 executed_task_count = 0;
885 continue;
888 // If we did find a task, do we want to execute it right now?
889 // Or should we wait a bit?
891 // How should we space things out?
892 double delay = 0;
893 delay = ComputeDelay (next_task.Priority, duration_of_previous_task, executed_task_count);
894 delay = Math.Min (delay, (next_trigger_time - now).TotalSeconds);
896 // Adjust by the time that has actually elapsed since the
897 // last task.
898 delay -= (now - end_time_of_previous_task).TotalSeconds;
900 // If we still need to wait a bit longer, wait for the appropriate
901 // amount of time and then re-start our while loop.
902 if (delay > 0.001) {
903 status_str = "Waiting for next task.";
904 Monitor.Wait (big_lock, TimeSpanFromSeconds (delay));
905 executed_task_count = 0;
906 continue;
910 // If we've made it to this point, it is time to start
911 // executing our selected task.
914 to_be_executed.Clear ();
916 if (next_task.Collector == null) {
918 to_be_executed.Add (next_task);
920 } else {
922 pre_hook = new Hook (next_task.Collector.PreTaskHook);
923 post_hook = new Hook (next_task.Collector.PostTaskHook);
925 // Find all eligible tasks with the same collector,
926 // and add them to the collection list.
927 now = DateTime.Now;
928 foreach (Task task in tasks_by_tag.Values)
929 if (task != next_task
930 && task.Collector == next_task.Collector
931 && !task.Blocked
932 && task.TriggerTime < now)
933 to_be_executed.Add (task);
935 // Order the tasks from highest to lowest priority.
936 // Our original task will always be the first item
937 // in the resulting array.
938 to_be_executed.Sort ();
939 to_be_executed.Add (next_task);
940 to_be_executed.Reverse ();
942 // Now find how many tasks can be executed before we
943 // exceed the collector's maximum weight. If necessary,
944 // prune the list of tasks.
945 double remaining_weight;
946 remaining_weight = next_task.Collector.GetMaximumWeight ();
947 int i = 0;
948 while (i < to_be_executed.Count && remaining_weight > 0) {
949 Task task;
950 task = to_be_executed [i] as Task;
951 remaining_weight -= task.Weight;
952 ++i;
954 if (i < to_be_executed.Count)
955 to_be_executed.RemoveRange (i, to_be_executed.Count - i);
958 // Remove the tasks we are about to execute from our
959 // master list.
960 foreach (Task task in to_be_executed)
961 tasks_by_tag.Remove (task.Tag);
963 // Pulse our lock, in case anyone is waiting for it.
964 Monitor.Pulse (big_lock);
967 // Now actually execute the set of tasks we found.
969 status_builder.Length = 0;
970 status_builder.Append ("Executing task");
971 if (to_be_executed.Count > 1)
972 status_builder.Append ('s');
973 status_builder.Append ('\n');
974 foreach (Task task in to_be_executed) {
975 task.AppendToStringBuilder (status_builder);
976 status_builder.Append ('\n');
978 status_str = status_builder.ToString ();
980 DateTime start_time = DateTime.Now;
981 if (pre_hook != null) {
982 try {
983 pre_hook ();
984 } catch (Exception ex) {
985 Logger.Log.Error (ex, "Caught exception in pre_hook '{0}'", pre_hook);
988 foreach (Task task in to_be_executed) {
989 task.DoTask ();
990 ++total_executed_task_count;
991 ++executed_task_count;
993 if (post_hook != null) {
994 try {
995 post_hook ();
996 } catch (Exception ex) {
997 Logger.Log.Error (ex, "Caught exception in post_hook '{0}'", post_hook);
1001 end_time_of_previous_task = DateTime.Now;
1002 duration_of_previous_task = (end_time_of_previous_task - start_time).TotalSeconds;
1005 // Execute all shutdown tasks
1006 foreach (Task task in shutdown_task_queue)
1007 if (! task.Cancelled && ! task.Blocked)
1008 task.DoTask ();
1010 // Call Cleanup on all of our unexecuted tasks
1011 foreach (Task task in tasks_by_tag.Values)
1012 task.Cleanup ();
1014 if (Debug)
1015 Logger.Log.Debug ("Scheduler.Worker finished");
1018 //////////////////////////////////////////////////////////////////////////////
1020 private static StringBuilder cached_sb = new StringBuilder ();
1022 public SchedulerInformation GetCurrentStatus ()
1024 SchedulerInformation current_status = new SchedulerInformation ();
1026 lock (big_lock) {
1028 ArrayList blocked_tasks = new ArrayList ();
1029 ArrayList future_tasks = new ArrayList ();
1030 ArrayList pending_tasks = new ArrayList ();
1032 DateTime now = DateTime.Now;
1033 foreach (Task task in tasks_by_tag.Values) {
1034 if (task.Blocked)
1035 blocked_tasks.Add (task);
1036 else if (task.TriggerTime > now)
1037 future_tasks.Add (task);
1038 else
1039 pending_tasks.Add (task);
1042 blocked_tasks.Sort ();
1043 blocked_tasks.Reverse ();
1045 future_tasks.Sort ();
1046 future_tasks.Reverse ();
1048 pending_tasks.Sort ();
1049 pending_tasks.Reverse ();
1051 foreach (Task task in pending_tasks) {
1052 cached_sb.Length = 0;
1053 task.AppendToStringBuilder (cached_sb);
1054 current_status.PendingTasks.Add (cached_sb.ToString ());
1057 foreach (Task task in future_tasks) {
1058 cached_sb.Length = 0;
1059 task.AppendToStringBuilder (cached_sb);
1060 current_status.FutureTasks.Add (cached_sb.ToString ());
1063 foreach (Task task in blocked_tasks) {
1064 cached_sb.Length = 0;
1065 task.AppendToStringBuilder (cached_sb);
1066 current_status.BlockedTasks.Add (cached_sb.ToString ());
1069 current_status.TotalTaskCount = total_executed_task_count;
1070 current_status.StatusString = status_str;
1074 return current_status;
1079 public class SchedulerInformation {
1080 [XmlAttribute]
1081 public int TotalTaskCount = -1;
1083 [XmlAttribute]
1084 public string StatusString;
1086 [XmlArray]
1087 [XmlArrayItem (ElementName="PendingTask", Type=typeof (string))]
1088 public ArrayList PendingTasks = new ArrayList ();
1090 [XmlArray]
1091 [XmlArrayItem (ElementName="FutureTask", Type=typeof (string))]
1092 public ArrayList FutureTasks = new ArrayList ();
1094 [XmlArray]
1095 [XmlArrayItem (ElementName="BlockedTask", Type=typeof (string))]
1096 public ArrayList BlockedTasks = new ArrayList ();
1098 private static StringBuilder sb = new StringBuilder ();
1100 public string ToHumanReadableString ()
1102 sb.Length = 0;
1104 sb.Append ("Scheduler:\n");
1105 sb.Append ("Count: ").Append (TotalTaskCount);
1106 sb.Append ('\n');
1108 if (StatusString != null)
1109 sb.Append ("Status: ").Append (StatusString).Append ('\n');
1111 int pos = 1;
1112 sb.Append ("\nPending Tasks:\n");
1113 if (PendingTasks != null && PendingTasks.Count > 0) {
1114 foreach (string task in PendingTasks) {
1115 sb.Append (pos).Append (' ').Append (task).Append ('\n');
1116 ++pos;
1118 } else
1119 sb.Append ("Scheduler queue is empty.\n");
1122 if (FutureTasks != null && FutureTasks.Count > 0) {
1123 sb.Append ("\nFuture Tasks:\n");
1124 foreach (string task in FutureTasks)
1125 sb.Append (task).Append ('\n');
1128 if (BlockedTasks != null && BlockedTasks.Count > 0) {
1129 sb.Append ("\nBlocked Tasks:\n");
1130 foreach (string task in BlockedTasks)
1131 sb.Append (task).Append ('\n');
1134 return sb.ToString ();
1138 #if false
1139 class TestTask : Scheduler.Task {
1141 private class TestCollector : Scheduler.ITaskCollector {
1143 public double GetMinimumWeight ()
1145 return 0;
1148 public double GetMaximumWeight ()
1150 return 5;
1153 public void PreTaskHook ()
1155 Console.WriteLine ("+++ Pre-Task Hook");
1158 public void PostTaskHook ()
1160 Console.WriteLine ("+++ Post-Task Hook");
1164 protected override void DoTaskReal ()
1166 Console.WriteLine ("Doing task '{0}' at {1}", Tag, DateTime.Now);
1167 Thread.Sleep (200);
1168 if (Tag == "Bar")
1169 Reschedule = true;
1172 private static void BeginTaskGroup ()
1174 Console.WriteLine ("--- Begin Task Group!");
1177 private static void EndTaskGroup ()
1179 Console.WriteLine ("--- End Task Group!");
1182 public static void Main ()
1184 Scheduler sched = Scheduler.Global;
1186 Scheduler.TaskGroup tg = Scheduler.NewTaskGroup ("foo",
1187 new Scheduler.Hook (BeginTaskGroup),
1188 new Scheduler.Hook (EndTaskGroup));
1190 sched.Start ();
1192 Scheduler.Task task;
1194 task = new TestTask ();
1195 task.Tag = "Foo";
1196 task.AddTaskGroup (tg);
1197 task.Priority = Scheduler.Priority.Delayed;
1198 task.TriggerTime = DateTime.Now.AddSeconds (7);
1199 sched.Add (task);
1201 task = new TestTask ();
1202 task.Tag = "Bar";
1203 task.AddTaskGroup (tg);
1204 task.Priority = Scheduler.Priority.Delayed;
1205 sched.Add (task);
1207 Scheduler.ITaskCollector collector = null;
1208 for (int i = 0; i < 20; ++i) {
1209 if ((i % 10) == 0)
1210 collector = new TestCollector ();
1211 task = new TestTask ();
1212 task.Tag = String.Format ("Baboon {0}", i);
1213 task.Collector = collector;
1214 task.Priority = Scheduler.Priority.Delayed;
1215 sched.Add (task);
1218 while (true) {
1219 Thread.Sleep (1000);
1223 #endif