Added -window_days setting to Scheduler
[tcl-tlc-base.git] / scripts / scheduler.itcl
blob2136044ee6099ce2e3663229c8cb7e28d14c8a00
1 # vim: ft=tcl foldmethod=marker foldmarker=<<<,>>> ts=4 shiftwidth=4
3 # Expected Datasource schema:
4 # column 0: ID
5 # column 1: event_source_script
7 # event_source_script must define a proc get_events {start_s end_s}
8 # that returns a list of event descriptions, each a list of
9 # {epoc_seconds event_data}
10 # Example (generates hourly events on the hour):
12 # proc get_events {start_s end_s} {
13 # set events {}
14 # set s [expr {($start_s / 3600) * 3600}]
15 # while {$s < $end_s} {
16 # lappend events [list $s [list the_posted_event_for $s]]
17 # incr s 3600
18 # }
19 # return $events
20 # }
22 # Handers fired:
23 # event_fired(script_id, event_data) - When a scheduled event fires
25 class tlc::Scheduler {
26 inherit tlc::Baselog tlc::Handlers
28 constructor {args} {}
29 destructor {}
31 public {
32 variable events_ds ""
33 variable script_col 1
34 variable window_days 1.0
36 # Test framework helpers
37 method _test_pending_events {}
40 private {
41 variable posted
42 variable afterids
43 variable refresh_afterid ""
44 variable current_horizon
46 method fire {s}
47 method process_script {id script start_s end_s}
48 method process_event {event}
49 method setup_events {}
50 method remove_ev_source {id}
51 method update_last_run {{s now}}
52 method refresh {}
53 method get_last_run {}
54 method now {}
56 method new_item {pool id data}
57 method change_item {pool id olddata newdata}
58 method remove_item {pool id olddata}
60 method new_source {id script}
65 body tlc::Scheduler::constructor {args} { #<<<
66 log debug $this
67 package require Tcl 8.5
69 array set posted {}
70 array set afterids {}
72 configure {*}$args
74 foreach reqf {events_ds} {
75 if {![info exists $reqf] || [set $reqf] == ""} {
76 error "Must set -$reqf"
80 if {![itcl::is object $events_ds] || ![$events_ds isa tlc::Datasource]} {
81 error "-events_ds must be a tlc::Datasource or descendant"
84 if {[$events_ds isa tlc::DSchan] || [$events_ds isa tlc::Datasource_filter]} {
85 log debug "-events_ds is a DSchan or Datasource_filter"
86 $events_ds register_handler new_item [code $this new_item]
87 $events_ds register_handler change_item [code $this change_item]
88 $events_ds register_handler remove_item [code $this remove_item]
89 setup_events
90 } else {
91 setup_events
95 #>>>
96 body tlc::Scheduler::destructor {} { #<<<
97 log debug $this
98 after cancel $refresh_afterid; set refresh_afterid ""
100 foreach {s afterid} [array get afterids] {
101 after cancel $afterid
102 array unset afterids $s
105 if {
106 [info exists events_ds] &&
107 [itcl::is object $events_ds] &&
109 [$events_ds isa tlc::DSchan] ||
110 [$events_ds isa tlc::Datasource_filter]
113 $events_ds deregister_handler new_item [code $this new_item]
114 $events_ds deregister_handler change_item [code $this change_item]
115 $events_ds deregister_handler remove_item [code $this remove_item]
116 $events_ds deregister_handler init [code $this setup_events]
120 #>>>
121 body tlc::Scheduler::fire {s} { #<<<
122 set afterinfo {}
123 foreach id [after info] {
124 lappend afterinfo [list $id [after info $id]]
126 log debug "\nposted for this slot: [llength $posted($s)] items: ([join $posted($s) |])\npending afters:\n\t[join $afterinfo \n\t]"
127 if {[info exists afterids($s)]} {
128 after cancel $afterids($s)
129 array unset afterids $s
131 if {![info exists posted($s)]} return
133 foreach event $posted($s) {
134 tlc::try {
135 process_event $event
136 } onerr {
137 default {
138 log error "error processing event: $errmsg\n$::errorInfo"
142 array unset posted $s
144 update_last_run $s
147 #>>>
148 body tlc::Scheduler::process_script {id script start_s end_s} { #<<<
149 log debug
150 set interp [interp create -safe]
152 $interp alias remove_ev_source [code $this remove_ev_source $id]
154 tlc::try {
155 $interp eval $script
156 if {[$interp eval {expr {
157 [info commands get_events] == "get_events"
158 }}]} {
159 set events [$interp eval [list get_events $start_s $end_s]]
160 } else {
161 set events {}
162 log error "Event source $id does not define get_events proc"
164 } onerr {
165 default {
166 log error "Error initializing safe interpreter with script or calling get_events: $errmsg ($::errorCode)\n$::errorInfo"
168 } onok {
169 foreach event $events {
170 lassign $event s event_data
172 if {$s < $start_s || $s > $end_s} continue
174 lappend posted($s) [list $id $event_data]
175 if {![info exists afterids($s)]} {
176 set delta [expr {$s - [now]}]
177 if {$delta < 1} {
178 set delta 1
180 log notice "scheduling after for $delta seconds time (slot $s)"
181 set afterids($s) [after [expr {$delta * 1000}] \
182 [code $this fire $s]]
187 interp delete $interp
190 #>>>
191 body tlc::Scheduler::process_event {event} { #<<<
192 log debug
193 lassign $event script_id event_data
194 invoke_handlers event_fired $script_id $event_data
197 #>>>
198 body tlc::Scheduler::setup_events {} { #<<<
199 log debug
200 after cancel $refresh_afterid; set refresh_afterid ""
202 set last_run [get_last_run]
204 set end_s [expr {[now] + int($window_days * 86400)}]
205 set current_horizon $end_s
207 set id_column [$events_ds cget -id_column]
208 foreach row [$events_ds get_list {}] {
209 set script_id [lindex $row $id_column]
210 new_item foo $script_id $row
213 set delta [expr {($end_s - [now]) * 1000}]
215 set refresh_afterid [after $delta [code $this refresh]]
218 #>>>
219 body tlc::Scheduler::remove_ev_source {id} { #<<<
220 log debug
221 set now [now]
223 foreach {s events} [array get posted] {
224 set new_events {}
225 foreach event $events {
226 set ev_script_id [lindex $event 2]
227 if {$ev_script_id != $id} {
228 lappend new_events $event
232 if {[llength $new_events] != [llength $events]} {
233 if {[llength $new_events] == 0} {
234 if {$s >= $now} {
235 after cancel $afterid
236 array unset afterids $s
237 array unset posted $s
239 } else {
240 set posted($s) $new_events
246 #>>>
247 body tlc::Scheduler::update_last_run {{s now}} { #<<<
248 log debug
249 if {$s == "now"} {
250 set s [now]
253 set fp [open "last_run" w]
254 puts $fp $s
255 close $fp
258 #>>>
259 body tlc::Scheduler::refresh {} { #<<<
260 log debug
261 set now [now]
262 update_last_run $now
264 foreach {s afterid} {
265 if {$s >= $now} {
266 after cancel $afterid
267 array unset afterids $s
268 array unset posted $s
271 setup_events
274 #>>>
275 body tlc::Scheduler::get_last_run {} { #<<<
276 log debug
277 if {[catch {
278 set fp [open "last_run" r]
279 } errmsg]} {
280 set last_run ""
281 } else {
282 set last_run [read $fp]
283 close $fp
286 if {![string is integer -strict $last_run]} {
287 set last_run [now]
290 return $last_run
293 #>>>
294 body tlc::Scheduler::new_item {pool id data} { #<<<
295 log debug
296 set script [lindex $data $script_col]
297 new_source $id $script
300 #>>>
301 body tlc::Scheduler::change_item {pool id olddata newdata} { #<<<
302 set oldscript [lindex $olddata 1]
303 set newscript [lindex $newdata 1]
304 if {$oldscript == $newscript} return
305 log debug
306 remove_ev_source $id
307 new_item $pool $id $newdata
310 #>>>
311 body tlc::Scheduler::remove_item {pool id olddata} { #<<<
312 log debug
313 remove_ev_source $id
316 #>>>
317 body tlc::Scheduler::new_source {id script} { #<<<
318 log debug
319 tlc::assert {[info exists current_horizon]} "current_horizon defined"
320 set start_s [now]
321 set end_s $current_horizon
323 process_script $id $script $start_s $end_s
326 #>>>
327 body tlc::Scheduler::now {} { #<<<
328 return [clock seconds]
331 #>>>
332 body tlc::Scheduler::_test_pending_events {} { #<<<
333 log debug
334 set build {}
335 foreach s [lsort -integer -increasing [array names posted]] {
336 lappend build $s $posted($s)
338 return $build
341 #>>>