cvsimport
[beagle.git] / Util / Scheduler.cs
blobf1712026996405464fec7450ab887b729a41113b
1 //
2 // Scheduler.cs
3 //
4 // Copyright (C) 2004-2005 Novell, Inc.
5 //
7 //
8 // Permission is hereby granted, free of charge, to any person obtaining a
9 // copy of this software and associated documentation files (the "Software"),
10 // to deal in the Software without restriction, including without limitation
11 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
12 // and/or sell copies of the Software, and to permit persons to whom the
13 // Software is furnished to do so, subject to the following conditions:
15 // The above copyright notice and this permission notice shall be included in
16 // all copies or substantial portions of the Software.
18 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 // DEALINGS IN THE SOFTWARE.
27 using System;
28 using System.Collections;
29 using System.Text;
30 using System.Threading;
31 using System.Xml;
32 using System.Xml.Serialization;
34 namespace Beagle.Util {
36 public class Scheduler {
38 // Fire an event if there are no tasks left to execute.
39 public delegate void EmptyQueueDelegate ();
40 public event EmptyQueueDelegate EmptyQueueEvent;
42 public static bool Debug = false;
44 public enum Priority {
46 Shutdown = 0, // Do it on shutdown
48 Idle = 1, // Execute only when the whole machine is idle
49 // Probably should be reserved for computationally-expensive stuff
50 // FIXME: These are not properly scheduled right now
52 Maintenance = 2, // Only execute when there are no lower-priority
53 // tasks from the same source to execute instead
55 Delayed = 3, // Do it soon
57 Immediate = 4, // Do it right now
60 public delegate void Hook ();
61 public delegate void TaskHook (Task task);
63 //////////////////////////////////////////////////////////////////////////////
65 public abstract class Task : IComparable {
67 private string tag = null;
68 private Priority priority = Priority.Delayed;
69 private int sub_priority = 0;
70 private DateTime trigger_time = DateTime.MinValue;
71 private DateTime timestamp; // when added to the scheduler
73 // Some metadata
74 public string Creator;
75 public string Description;
77 public object Source = null; // this is just an opaque identifier
79 public ITaskCollector Collector = null;
80 public double Weight = 1.0;
82 public bool Reschedule = false;
84 private ArrayList task_groups = null;
85 private TaskGroupPrivate child_task_group = null;
87 ///////////////////////////////
90 // The tag is the task's unique identifier
91 public string Tag {
93 get { return tag; }
95 set {
96 // Don't allow us to change the tag of a scheduled task
97 if (tag == null || scheduler == null)
98 tag = value;
99 else
100 throw new Exception ("Can't change tag of " + tag + "!");
104 public Priority Priority {
106 get { return priority; }
108 set {
109 if (priority != value) {
110 priority = value;
111 Recompute ();
116 public int SubPriority {
118 get { return sub_priority; }
120 set {
121 if (sub_priority != value) {
122 sub_priority = value;
123 Recompute ();
128 public DateTime TriggerTime {
130 get { return trigger_time; }
132 set {
133 if (trigger_time != value) {
134 trigger_time = value;
135 Recompute ();
140 public DateTime Timestamp {
141 get { return timestamp; }
144 ///////////////////////////////
146 public void AddTaskGroup (TaskGroup group)
148 if (task_groups == null)
149 task_groups = new ArrayList ();
150 task_groups.Add (group);
153 private void IncrementAllTaskGroups ()
155 if (task_groups != null) {
156 foreach (TaskGroupPrivate group in task_groups) {
157 if (! group.Finished)
158 group.Increment ();
163 private void DecrementAllTaskGroups ()
165 if (task_groups != null) {
166 foreach (TaskGroupPrivate group in task_groups) {
167 if (! group.Finished)
168 group.Decrement ();
173 private void TouchAllTaskGroups ()
175 if (task_groups != null) {
176 foreach (TaskGroupPrivate group in task_groups) {
177 if (! group.Finished)
178 group.Touch ();
183 ///////////////////////////////
185 private Scheduler scheduler = null;
187 public void Schedule (Scheduler scheduler)
189 // Increment the task groups the first
190 // time a task is scheduled.
191 if (this.scheduler == null)
192 IncrementAllTaskGroups ();
193 this.timestamp = DateTime.Now;
194 this.scheduler = scheduler;
195 this.cancelled = false;
198 private void Recompute ()
200 if (scheduler != null)
201 scheduler.Recompute ();
204 ///////////////////////////////
206 // A blocked task will not execute.
207 public bool Blocked {
208 get {
209 // Block the task if we have unexecuted children
210 return child_task_group != null && ! child_task_group.Finished;
214 ///////////////////////////////
216 private bool cancelled = false;
218 public bool Cancelled {
219 get { return cancelled; }
222 public void Cancel ()
224 if (! cancelled) {
225 DecrementAllTaskGroups ();
226 Cleanup (); // clean up after cancelled tasks
228 cancelled = true;
231 ///////////////////////////////
233 // The Task's count keeps track of how many
234 // times it has been executed.
236 private int count = 0;
238 public int Count {
239 get { return count; }
242 ///////////////////////////////
244 public void SpawnChild (Task child_task)
246 if (child_task_group == null)
247 child_task_group = new TaskGroupPrivate ("Children of " + Tag, null, null);
248 child_task.AddTaskGroup (child_task_group);
249 child_task.Source = this.Source;
250 scheduler.Add (child_task);
253 ///////////////////////////////
255 public void DoTask ()
257 if (! cancelled) {
258 if (Debug)
259 Logger.Log.Debug ("Starting task {0}", Tag);
260 child_task_group = null;
261 Reschedule = false;
262 TouchAllTaskGroups ();
264 Stopwatch sw = new Stopwatch ();
265 sw.Start ();
267 try {
268 DoTaskReal ();
269 } catch (Exception ex) {
270 Logger.Log.Warn (ex,
271 "Caught exception in DoTaskReal\n" +
272 " Tag: {0}\n" +
273 " Creator: {0}\n" +
274 "Description: {0}\n" +
275 " Priority: {0} ({1})",
276 Tag, Creator, Description, Priority, SubPriority);
278 sw.Stop ();
279 if (Debug)
280 Logger.Log.Debug ("Finished task {0} in {1}", Tag, sw);
282 if (Reschedule) {
283 ++count;
284 if (Debug)
285 Log.Debug ("Rescheduling task {0}", Tag);
286 scheduler.Add (this); // re-add ourselves
287 } else {
288 DecrementAllTaskGroups ();
289 scheduler = null;
294 protected abstract void DoTaskReal ();
296 ///////////////////////////////
298 // Clean-up is called whenever we know that a task will never
299 // be executed. It is never called on tasks for who DoTaskReal
300 // has been called (except when rescheduled). Cleanup is also
301 // called when a task is cancelled.
303 public void Cleanup ()
305 try {
306 DoCleanup ();
307 } catch (Exception ex) {
308 Logger.Log.Warn (ex, "Caught exception cleaning up task '{0}'", Tag);
312 protected virtual void DoCleanup ()
314 // Do nothing by default
317 ///////////////////////////////
319 // Sort from lowest to highest priority
320 // FIXME: This does not define a total ordering
321 // on the set of all tasks, so use it with care.
322 public int CompareTo (object obj)
324 Task other = obj as Task;
325 if (other == null)
326 return 1;
328 Priority this_priority;
329 Priority other_priority;
331 this_priority = this.Priority;
332 other_priority = other.Priority;
334 // To other sources, Maintenance tasks looks like
335 // Delayed tasks.
336 if (this.Source != other.Source) {
337 if (this_priority == Priority.Maintenance)
338 this_priority = Priority.Delayed;
339 if (other_priority == Priority.Maintenance)
340 other_priority = Priority.Delayed;
343 int cmp;
344 cmp = (int)this_priority - (int)other_priority;
345 if (cmp != 0)
346 return cmp;
348 cmp = this.SubPriority - other.SubPriority;
349 if (cmp != 0)
350 return cmp;
352 // Tasks that were added to the scheduler earlier take
353 // precedence over those that were added later.
354 cmp = DateTime.Compare (other.Timestamp, this.Timestamp);
355 if (cmp != 0)
356 return cmp;
358 // Try to break any ties
359 return this.GetHashCode () - other.GetHashCode ();
362 public void AppendToStringBuilder (StringBuilder sb)
364 sb.Append (Priority).Append (' ').Append (SubPriority);
365 sb.Append (" (").Append (Timestamp).Append (")\n");
367 sb.Append (Tag).Append ('\n');
369 double t = (TriggerTime - DateTime.Now).TotalSeconds;
370 if (t > 0) {
371 if (t < 120)
372 sb.AppendFormat ("Hold for {0:0.00} seconds\n", t);
373 else {
374 sb.Append ("Hold until ").Append (TriggerTime);
375 sb.Append ('\n');
379 if (Creator != null)
380 sb.Append ("Creator: ").Append (Creator).Append ('\n');
382 if (Description != null)
383 sb.Append (Description).Append ('\n');
387 private class TaskHookWrapper : Task {
389 TaskHook hook;
391 public TaskHookWrapper (TaskHook hook)
393 this.hook = hook;
396 protected override void DoTaskReal ()
398 if (hook != null)
399 hook (this);
403 public static Task TaskFromHook (TaskHook hook)
405 return new TaskHookWrapper (hook);
408 //////////////////////////////////////////////////////////////////////////////
411 // Task Groups
414 public static TaskGroup NewTaskGroup (string name, Hook pre_hook, Hook post_hook)
416 return new TaskGroupPrivate (name, pre_hook, post_hook);
419 // The TaskGroup we hand back to the user is an interface that
420 // exposes minimal functionality.
421 public interface TaskGroup {
422 string Name { get; }
423 bool Finished { get; }
426 private class TaskGroupPrivate : TaskGroup {
427 private string name;
428 private int task_count = 0;
429 private bool touched = false;
430 private bool finished = false;
431 private Hook pre_hook;
432 private Hook post_hook;
434 public TaskGroupPrivate (string name,
435 Hook pre_hook,
436 Hook post_hook)
438 this.name = name;
439 this.pre_hook = pre_hook;
440 this.post_hook = post_hook;
443 public string Name {
444 get { return name; }
447 public bool Finished {
448 get { return finished; }
451 // Call this when a task is added to the task group.
452 public void Increment ()
454 if (finished)
455 throw new Exception ("Tried to increment a finished TaskGroup");
456 ++task_count;
459 // Call this when we execute a task in the task group.
460 public void Touch ()
462 if (finished)
463 throw new Exception ("Tried to touch a finished TaskGroup");
465 if (! touched) {
466 if (pre_hook != null) {
467 try {
468 pre_hook ();
469 } catch (Exception ex) {
470 Logger.Log.Warn (ex, "Caught exception in pre_hook of task group '{0}'", Name);
473 touched = true;
477 // Call this after a task in the task group is complete.
478 public void Decrement ()
480 if (finished)
481 throw new Exception ("Tried to decrement a finished TaskGroup");
483 --task_count;
484 // Only fire our post-hook if the pre-hook fired
485 // (or would have fired, had it been non-null)
486 if (task_count == 0 && touched) {
487 if (post_hook != null) {
488 try {
489 post_hook ();
490 } catch (Exception ex) {
491 Logger.Log.Warn (ex, "Caught exception in post_hook of task group '{0}'", Name);
494 finished = true;
499 //////////////////////////////////////////////////////////////////////////////
502 // Task Collector
504 // This is a mechanism for executing tasks in sets, possibly outside of
505 // priority order.
508 public interface ITaskCollector {
510 double GetMaximumWeight ();
512 void PreTaskHook ();
513 void PostTaskHook ();
516 //////////////////////////////////////////////////////////////////////////////
518 private static double global_delay = -1.0;
520 static Scheduler ()
522 string exercise;
523 exercise = Environment.GetEnvironmentVariable ("BEAGLE_EXERCISE_THE_DOG");
525 if (exercise != null) {
526 Log.Info ("BEAGLE_EXERCISE_THE_DOG is set.");
528 if (exercise.Length > 2 && exercise [0] == 't')
529 global_delay = Double.Parse (exercise.Substring (1));
530 else
531 global_delay = 0.0;
535 //////////////////////////////////////////////////////////////////////////////
537 private static Scheduler global = new Scheduler ();
539 public static Scheduler Global {
540 get { return global; }
543 //////////////////////////////////////////////////////////////////////////////
545 private object big_lock = new object ();
547 // FIXME: shutdown tasks should probably be ordered by something
548 private Queue shutdown_task_queue = new Queue ();
550 private Hashtable tasks_by_tag = new Hashtable ();
551 private int total_executed_task_count = 0;
553 public void Add (Task task)
555 if (task == null)
556 return;
558 if (task.Source == null)
559 throw new Exception ("Attempting to add Task with no source!");
561 Task old_task = null;
563 lock (big_lock) {
565 // Keep track of when immediate priority tasks are
566 // added so that we can throttle if the scheduler
567 // is being slammed with them.
568 if (task.Priority == Priority.Immediate) {
569 // Shift our times down by one
570 Array.Copy (last_immediate_times, 1, last_immediate_times, 0, 4);
571 last_immediate_times [4] = DateTime.Now;
574 old_task = tasks_by_tag [task.Tag] as Task;
576 task.Schedule (this);
578 // Re-adding the same task is basically a no-op --- we
579 // just update the timestamp and return.
580 if (old_task == task)
581 return;
583 if (Debug) {
584 Logger.Log.Debug ("Adding task {0}", task.Tag);
585 if (task.Description != null)
586 Logger.Log.Debug (" Desc: {0}", task.Description);
589 if (task.Priority == Priority.Shutdown)
590 shutdown_task_queue.Enqueue (task);
591 else
592 tasks_by_tag [task.Tag] = task;
594 Monitor.Pulse (big_lock);
597 // If we clobbered another task, call cancel on it.
598 // This happens after we release the lock, since
599 // cancellation could result in a task group post-hook
600 // being run.
601 if (old_task != null)
602 old_task.Cancel ();
605 public Task GetByTag (string tag)
607 lock (big_lock)
608 return tasks_by_tag [tag] as Task;
611 public bool ContainsByTag (string tag)
613 Task task = GetByTag (tag);
614 return task != null && !task.Cancelled;
617 public void Recompute ()
619 lock (big_lock)
620 Monitor.Pulse (big_lock);
623 //////////////////////////////////////////////////////////////////////////////
625 private Thread thread = null;
626 public bool running = false;
627 private static bool shutdown_requested = false;
629 public void Start ()
631 lock (this) {
632 if (shutdown_requested || thread != null)
633 return;
634 running = true;
635 thread = ExceptionHandlingThread.Start (new ThreadStart (Worker));
639 public void Stop (bool to_shutdown)
641 lock (big_lock) {
642 shutdown_requested = to_shutdown;
644 if (running) {
645 running = false;
646 thread = null;
647 status_str = "Stopped";
648 Monitor.Pulse (big_lock);
653 public void Stop ()
655 Stop (false);
659 // Delay Computations
661 // This code controls how we space out tasks
664 // FIXME: random magic constants
665 const double idle_threshold = 5.314159 * 60; // probably should be longer
666 const double idle_ramp_up_time = 5.271828 * 60; // probably should be longer
667 const double default_delayed_rate_factor = 9.03; // work about 1/10th of the time
668 const double default_idle_rate_factor = 2.097; // work about 1/3rd of the time
669 const double maximum_delay = 20; // never wait for more than 20s
670 const double min_throttled_delay = 1.5; // never wait less than this when throttled
671 const double min_overloaded_delay = 2.2; // never wait less than this when there are many tasks
672 const int task_overload_threshold = 60; // number of tasks to process before delaying
674 DateTime[] last_immediate_times = new DateTime [5];
676 // The return value and duration_of_previous_task are both measured in seconds.
677 private double ComputeDelay (Priority priority_of_next_task,
678 double duration_of_previous_task,
679 int executed_task_count)
681 if (global_delay >= 0.0)
682 return global_delay;
684 double rate_factor;
686 rate_factor = 2.0;
688 // Do everything faster the longer we are idle.
689 double idle_time = SystemInformation.InputIdleTime;
690 double idle_scale = 1.0;
691 bool is_idle = false;
692 bool need_throttle = false;
694 // Never speed up if we are using the battery.
695 if (idle_time > idle_threshold && ! SystemInformation.UsingBattery) {
696 is_idle = true;
697 double t = (idle_time - idle_threshold) / idle_ramp_up_time;
698 idle_scale = (1 - Math.Min (t, 1.0));
701 switch (priority_of_next_task) {
703 case Priority.Immediate:
704 rate_factor = 0;
706 if (last_immediate_times [0] != DateTime.MinValue) {
707 TimeSpan last_add_delta = DateTime.Now.Subtract (last_immediate_times [4]);
709 // If less than a second has gone by since the
710 // last immediate task was added, there is
711 // still a torrent of events coming in, and we
712 // may need to throttle.
713 if (last_add_delta.TotalSeconds <= 1) {
714 TimeSpan between_add_delta = last_immediate_times [4].Subtract (last_immediate_times [0]);
716 // At least 5 immediate tasks have been
717 // added in the last second. We
718 // definitely need to throttle.
719 if (between_add_delta.TotalSeconds <= 1) {
720 need_throttle = true;
721 rate_factor = idle_scale * default_idle_rate_factor;
726 // If we've processed many tasks since the last
727 // time we took a break, ignore the priority and set a
728 // delay equivalent to Priority.Delayed.
729 if (!is_idle && executed_task_count >= task_overload_threshold)
730 rate_factor = idle_scale * default_delayed_rate_factor;
732 break;
734 case Priority.Delayed:
735 rate_factor = idle_scale * default_delayed_rate_factor;
736 break;
738 case Priority.Idle:
739 rate_factor = idle_scale * default_idle_rate_factor;
740 break;
744 // FIXME: we should do something more sophisticated than this
745 // with the load average.
746 // Random numbers galore!
747 double load_average = SystemInformation.LoadAverageOneMinute;
748 if (load_average > 3.001)
749 rate_factor *= 5.002;
750 else if (load_average > 1.5003)
751 rate_factor *= 2.004;
753 double delay = rate_factor * duration_of_previous_task;
755 // space out delayed tasks a bit when we aren't idle
756 if (! is_idle
757 && priority_of_next_task == Priority.Delayed
758 && delay < 0.5)
759 delay = 0.5;
761 if (delay > maximum_delay)
762 delay = maximum_delay;
764 // If we need to throttle, make sure we don't delay less than
765 // a second and some.
766 if (need_throttle && delay < min_throttled_delay)
767 delay = min_throttled_delay;
769 // If we're not idle and we've just processed more
770 // than a certain number of events, take a break.
771 if (! is_idle
772 && executed_task_count >= task_overload_threshold
773 && delay < min_overloaded_delay)
774 delay = min_overloaded_delay;
776 return delay;
780 // The main loop
783 // A convenience function. There should be a
784 // constructor to TimeSpan that does this.
785 private static TimeSpan TimeSpanFromSeconds (double t)
787 // Wait barfs if you hand it a negative TimeSpan,
788 // so we are paranoid;
789 if (t < 0.001)
790 t = 0;
792 // 1 tick = 100 nanoseconds
793 long ticks = (long) (t * 1.0e+7);
794 return new TimeSpan (ticks);
797 private string status_str = null;
799 private void Worker ()
801 DateTime end_time_of_previous_task = DateTime.MinValue;
802 double duration_of_previous_task = 0.0;
804 Hook pre_hook = null;
805 Hook post_hook = null;
806 ArrayList to_be_executed = new ArrayList ();
807 Hashtable max_priority_by_source = new Hashtable ();
808 int executed_task_count = 0;
809 StringBuilder status_builder = new StringBuilder ();
811 while (running) {
813 status_str = "Finding next task to execute";
815 lock (big_lock) {
817 // If there are no pending tasks, wait
818 // on our lock and then re-start our
819 // while loop
820 if (tasks_by_tag.Count == 0) {
821 if (EmptyQueueEvent != null)
822 EmptyQueueEvent ();
823 status_str = "Waiting on empty queue";
824 Monitor.Wait (big_lock);
825 executed_task_count = 0;
826 continue;
829 // Walk across our list of tasks and find
830 // the next one to execute.
831 DateTime now = DateTime.Now;
832 DateTime next_trigger_time = DateTime.MaxValue;
834 // Make a first pass over our tasks, finding the
835 // highest-priority item per source.
836 max_priority_by_source.Clear ();
837 foreach (Task task in tasks_by_tag.Values) {
838 if (task.Blocked || task.TriggerTime >= now)
839 continue;
840 if (max_priority_by_source.Contains (task.Source)) {
841 Priority p = (Priority) max_priority_by_source [task.Source];
842 if (p < task.Priority)
843 max_priority_by_source [task.Source] = task.Priority;
844 } else {
845 max_priority_by_source [task.Source] = task.Priority;
849 // Now make a second pass over the tasks and find
850 // the highest-priority item. We use the information
851 // from the first pass to correctly prioritize maintenance tasks.
852 Task next_task = null;
853 foreach (Task task in tasks_by_tag.Values) {
854 if (task.Blocked)
855 continue;
856 if (task.TriggerTime >= now) {
857 if (task.TriggerTime < next_trigger_time)
858 next_trigger_time = task.TriggerTime;
859 continue;
862 // If this is a maintenance task and there is a high-priority
863 // task from the same source, skip it.
864 if (task.Priority == Priority.Maintenance) {
865 Priority p = (Priority) max_priority_by_source [task.Source];
866 if (p > task.Priority)
867 continue;
870 if (task.TriggerTime < now) {
871 if (next_task == null || next_task.CompareTo (task) < 0)
872 next_task = task;
876 // If we didn't find a task, wait for the next trigger-time
877 // and then re-start our while loop.
878 if (next_task == null) {
879 if (next_trigger_time == DateTime.MaxValue) {
880 status_str = "Waiting for an unblocked task";
881 Monitor.Wait (big_lock);
882 } else {
883 status_str = "Waiting for the next trigger time";
884 Monitor.Wait (big_lock, next_trigger_time - now);
886 executed_task_count = 0;
887 continue;
890 // If we did find a task, do we want to execute it right now?
891 // Or should we wait a bit?
893 // How should we space things out?
894 double delay = 0;
895 delay = ComputeDelay (next_task.Priority, duration_of_previous_task, executed_task_count);
896 delay = Math.Min (delay, (next_trigger_time - now).TotalSeconds);
898 // Adjust by the time that has actually elapsed since the
899 // last task.
900 delay -= (now - end_time_of_previous_task).TotalSeconds;
902 // If we still need to wait a bit longer, wait for the appropriate
903 // amount of time and then re-start our while loop.
904 if (delay > 0.001) {
905 status_str = "Waiting for next task.";
906 Monitor.Wait (big_lock, TimeSpanFromSeconds (delay));
907 executed_task_count = 0;
908 continue;
912 // If we've made it to this point, it is time to start
913 // executing our selected task.
916 to_be_executed.Clear ();
918 if (next_task.Collector == null) {
920 to_be_executed.Add (next_task);
922 } else {
924 pre_hook = new Hook (next_task.Collector.PreTaskHook);
925 post_hook = new Hook (next_task.Collector.PostTaskHook);
927 // Find all eligible tasks with the same collector,
928 // and add them to the collection list.
929 now = DateTime.Now;
930 foreach (Task task in tasks_by_tag.Values)
931 if (task != next_task
932 && task.Collector == next_task.Collector
933 && !task.Blocked
934 && task.TriggerTime < now)
935 to_be_executed.Add (task);
937 // Order the tasks from highest to lowest priority.
938 // Our original task will always be the first item
939 // in the resulting array.
940 to_be_executed.Sort ();
941 to_be_executed.Add (next_task);
942 to_be_executed.Reverse ();
944 // Now find how many tasks can be executed before we
945 // exceed the collector's maximum weight. If necessary,
946 // prune the list of tasks.
947 double remaining_weight;
948 remaining_weight = next_task.Collector.GetMaximumWeight ();
949 int i = 0;
950 while (i < to_be_executed.Count && remaining_weight > 0) {
951 Task task;
952 task = to_be_executed [i] as Task;
953 remaining_weight -= task.Weight;
954 ++i;
956 if (i < to_be_executed.Count)
957 to_be_executed.RemoveRange (i, to_be_executed.Count - i);
960 // Remove the tasks we are about to execute from our
961 // master list.
962 foreach (Task task in to_be_executed)
963 tasks_by_tag.Remove (task.Tag);
965 // Pulse our lock, in case anyone is waiting for it.
966 Monitor.Pulse (big_lock);
969 // Now actually execute the set of tasks we found.
971 status_builder.Length = 0;
972 status_builder.Append ("Executing task");
973 if (to_be_executed.Count > 1)
974 status_builder.Append ('s');
975 status_builder.Append ('\n');
976 foreach (Task task in to_be_executed) {
977 task.AppendToStringBuilder (status_builder);
978 status_builder.Append ('\n');
980 status_str = status_builder.ToString ();
982 DateTime start_time = DateTime.Now;
983 if (pre_hook != null) {
984 try {
985 pre_hook ();
986 } catch (Exception ex) {
987 Logger.Log.Error (ex, "Caught exception in pre_hook '{0}'", pre_hook);
990 foreach (Task task in to_be_executed) {
991 task.DoTask ();
992 ++total_executed_task_count;
993 ++executed_task_count;
995 if (post_hook != null) {
996 try {
997 post_hook ();
998 } catch (Exception ex) {
999 Logger.Log.Error (ex, "Caught exception in post_hook '{0}'", post_hook);
1003 end_time_of_previous_task = DateTime.Now;
1004 duration_of_previous_task = (end_time_of_previous_task - start_time).TotalSeconds;
1007 // Execute all shutdown tasks
1008 foreach (Task task in shutdown_task_queue)
1009 if (! task.Cancelled && ! task.Blocked)
1010 task.DoTask ();
1012 // Call Cleanup on all of our unexecuted tasks
1013 foreach (Task task in tasks_by_tag.Values)
1014 task.Cleanup ();
1016 if (Debug)
1017 Logger.Log.Debug ("Scheduler.Worker finished");
1020 //////////////////////////////////////////////////////////////////////////////
1022 private static StringBuilder cached_sb = new StringBuilder ();
1024 public SchedulerInformation GetCurrentStatus ()
1026 SchedulerInformation current_status = new SchedulerInformation ();
1028 lock (big_lock) {
1030 ArrayList blocked_tasks = new ArrayList ();
1031 ArrayList future_tasks = new ArrayList ();
1032 ArrayList pending_tasks = new ArrayList ();
1034 DateTime now = DateTime.Now;
1035 foreach (Task task in tasks_by_tag.Values) {
1036 if (task.Blocked)
1037 blocked_tasks.Add (task);
1038 else if (task.TriggerTime > now)
1039 future_tasks.Add (task);
1040 else
1041 pending_tasks.Add (task);
1044 blocked_tasks.Sort ();
1045 blocked_tasks.Reverse ();
1047 future_tasks.Sort ();
1048 future_tasks.Reverse ();
1050 pending_tasks.Sort ();
1051 pending_tasks.Reverse ();
1053 foreach (Task task in pending_tasks) {
1054 cached_sb.Length = 0;
1055 task.AppendToStringBuilder (cached_sb);
1056 current_status.PendingTasks.Add (cached_sb.ToString ());
1059 foreach (Task task in future_tasks) {
1060 cached_sb.Length = 0;
1061 task.AppendToStringBuilder (cached_sb);
1062 current_status.FutureTasks.Add (cached_sb.ToString ());
1065 foreach (Task task in blocked_tasks) {
1066 cached_sb.Length = 0;
1067 task.AppendToStringBuilder (cached_sb);
1068 current_status.BlockedTasks.Add (cached_sb.ToString ());
1071 current_status.TotalTaskCount = total_executed_task_count;
1072 current_status.StatusString = status_str;
1076 return current_status;
1081 public class SchedulerInformation {
1082 [XmlAttribute]
1083 public int TotalTaskCount = -1;
1085 [XmlAttribute]
1086 public string StatusString;
1088 [XmlArray]
1089 [XmlArrayItem (ElementName="PendingTask", Type=typeof (string))]
1090 public ArrayList PendingTasks = new ArrayList ();
1092 [XmlArray]
1093 [XmlArrayItem (ElementName="FutureTask", Type=typeof (string))]
1094 public ArrayList FutureTasks = new ArrayList ();
1096 [XmlArray]
1097 [XmlArrayItem (ElementName="BlockedTask", Type=typeof (string))]
1098 public ArrayList BlockedTasks = new ArrayList ();
1100 private static StringBuilder sb = new StringBuilder ();
1102 public string ToHumanReadableString ()
1104 sb.Length = 0;
1106 sb.Append ("Scheduler:\n");
1107 sb.Append ("Count: ").Append (TotalTaskCount);
1108 sb.Append ('\n');
1110 if (StatusString != null)
1111 sb.Append ("Status: ").Append (StatusString).Append ('\n');
1113 int pos = 1;
1114 sb.Append ("\nPending Tasks:\n");
1115 if (PendingTasks != null && PendingTasks.Count > 0) {
1116 foreach (string task in PendingTasks) {
1117 sb.Append (pos).Append (' ').Append (task).Append ('\n');
1118 ++pos;
1120 } else
1121 sb.Append ("Scheduler queue is empty.\n");
1124 if (FutureTasks != null && FutureTasks.Count > 0) {
1125 sb.Append ("\nFuture Tasks:\n");
1126 foreach (string task in FutureTasks)
1127 sb.Append (task).Append ('\n');
1130 if (BlockedTasks != null && BlockedTasks.Count > 0) {
1131 sb.Append ("\nBlocked Tasks:\n");
1132 foreach (string task in BlockedTasks)
1133 sb.Append (task).Append ('\n');
1136 return sb.ToString ();
1140 #if false
1141 class TestTask : Scheduler.Task {
1143 private class TestCollector : Scheduler.ITaskCollector {
1145 public double GetMinimumWeight ()
1147 return 0;
1150 public double GetMaximumWeight ()
1152 return 5;
1155 public void PreTaskHook ()
1157 Console.WriteLine ("+++ Pre-Task Hook");
1160 public void PostTaskHook ()
1162 Console.WriteLine ("+++ Post-Task Hook");
1166 protected override void DoTaskReal ()
1168 Console.WriteLine ("Doing task '{0}' at {1}", Tag, DateTime.Now);
1169 Thread.Sleep (200);
1170 if (Tag == "Bar")
1171 Reschedule = true;
1174 private static void BeginTaskGroup ()
1176 Console.WriteLine ("--- Begin Task Group!");
1179 private static void EndTaskGroup ()
1181 Console.WriteLine ("--- End Task Group!");
1184 public static void Main ()
1186 Scheduler sched = Scheduler.Global;
1188 Scheduler.TaskGroup tg = Scheduler.NewTaskGroup ("foo",
1189 new Scheduler.Hook (BeginTaskGroup),
1190 new Scheduler.Hook (EndTaskGroup));
1192 sched.Start ();
1194 Scheduler.Task task;
1196 task = new TestTask ();
1197 task.Tag = "Foo";
1198 task.AddTaskGroup (tg);
1199 task.Priority = Scheduler.Priority.Delayed;
1200 task.TriggerTime = DateTime.Now.AddSeconds (7);
1201 sched.Add (task);
1203 task = new TestTask ();
1204 task.Tag = "Bar";
1205 task.AddTaskGroup (tg);
1206 task.Priority = Scheduler.Priority.Delayed;
1207 sched.Add (task);
1209 Scheduler.ITaskCollector collector = null;
1210 for (int i = 0; i < 20; ++i) {
1211 if ((i % 10) == 0)
1212 collector = new TestCollector ();
1213 task = new TestTask ();
1214 task.Tag = String.Format ("Baboon {0}", i);
1215 task.Collector = collector;
1216 task.Priority = Scheduler.Priority.Delayed;
1217 sched.Add (task);
1220 while (true) {
1221 Thread.Sleep (1000);
1225 #endif