1 local logger
= require
"util.logger";
2 local log = logger
.init("util.async");
3 local new_id
= require
"util.id".short
;
4 local xpcall
= require
"util.xpcall".xpcall
;
6 local function checkthread()
7 local thread
, main
= coroutine
.running();
8 if not thread
or main
then
9 error("Not running in an async context, see https://prosody.im/doc/developers/util/async");
14 local function runner_from_thread(thread
)
16 -- Find the 'level' of the top-most function (0 == current level, 1 == caller, ...)
17 while debug
.getinfo(thread
, level
, "") do level
= level
+ 1; end
18 local name
, runner
= debug
.getlocal(thread
, level
-1, 1);
19 if name
~= "self" or type(runner
) ~= "table" or runner
.thread
~= thread
then
25 local function call_watcher(runner
, watcher_name
, ...)
26 local watcher
= runner
.watchers
[watcher_name
];
30 runner
:log("debug", "Calling '%s' watcher", watcher_name
);
31 local ok
, err
= xpcall(watcher
, debug
.traceback
, runner
, ...);
33 runner
:log("error", "Error in '%s' watcher: %s", watcher_name
, err
);
39 local function runner_continue(thread
)
40 -- ASSUMPTION: runner is in 'waiting' state (but we don't have the runner to know for sure)
41 if coroutine
.status(thread
) ~= "suspended" then -- This should suffice
42 log("error", "unexpected async state: thread not suspended");
45 local ok
, state
, runner
= coroutine
.resume(thread
);
48 -- Running the coroutine failed, which means we have to find the runner manually,
49 -- in order to inform the error handler
50 runner
= runner_from_thread(thread
);
52 log("error", "unexpected async state: unable to locate runner during error handling");
55 call_watcher(runner
, "error", debug
.traceback(thread
, err
));
56 runner
.state
, runner
.thread
= "ready", nil;
58 elseif state
== "ready" then
59 -- If state is 'ready', it is our responsibility to update runner.state from 'waiting'.
60 -- We also have to :run(), because the queue might have further items that will not be
61 -- processed otherwise. FIXME: It's probably best to do this in a nexttick (0 timer).
62 runner
.state
= "ready";
68 local function waiter(num
)
69 local thread
= checkthread();
73 if num
== 0 then return; end -- already done
75 coroutine
.yield("wait");
78 if num
== 0 and waiting
then
79 runner_continue(thread
);
81 error("done() called too many times");
86 local function guarder()
88 local default_id
= {};
89 return function (id
, func
)
90 id
= id
or default_id
;
91 local thread
= checkthread();
92 local guard
= guards
[id
];
96 log("debug", "New guard!");
98 table.insert(guard
, thread
);
99 log("debug", "Guarded. %d threads waiting.", #guard
)
100 coroutine
.yield("wait");
102 local function exit()
103 local next_waiting
= table.remove(guard
, 1);
105 log("debug", "guard: Executing next waiting thread (%d left)", #guard
)
106 runner_continue(next_waiting
);
108 log("debug", "Guard off duty.")
121 local runner_mt
= {};
122 runner_mt
.__index
= runner_mt
;
124 local function runner_create_thread(func
, self
)
125 local thread
= coroutine
.create(function (self
) -- luacheck: ignore 432/self
127 func(coroutine
.yield("ready", self
));
130 debug
.sethook(thread
, debug
.gethook());
131 assert(coroutine
.resume(thread
, self
)); -- Start it up, it will return instantly to wait for the first input
135 local function default_error_watcher(runner
, err
)
136 runner
:log("error", "Encountered error: %s", err
);
139 local function default_func(f
) f(); end
140 local function runner(func
, watchers
, data
)
142 local _log
= logger
.init("runner" .. id
);
143 return setmetatable({ func
= func
or default_func
, thread
= false, state
= "ready", notified_state
= "ready",
144 queue
= {}, watchers
= watchers
or { error = default_error_watcher
}, data
= data
, id
= id
, _log
= _log
; }
148 -- Add a task item for the runner to process
149 function runner_mt
:run(input
)
151 table.insert(self
.queue
, input
);
152 --self:log("debug", "queued new work item, %d items queued", #self.queue);
154 if self
.state
~= "ready" then
155 -- The runner is busy. Indicate that the task item has been
156 -- queued, and return information about the current runner state
157 return true, self
.state
, #self
.queue
;
160 local q
, thread
= self
.queue
, self
.thread
;
161 if not thread
or coroutine
.status(thread
) == "dead" then
162 self
:log("debug", "creating new coroutine");
163 -- Create a new coroutine for this runner
164 thread
= runner_create_thread(self
.func
, self
);
165 self
.thread
= thread
;
168 -- Process task item(s) while the queue is not empty, and we're not blocked
169 local n
, state
, err
= #q
, self
.state
, nil;
170 self
.state
= "running";
171 --self:log("debug", "running main loop");
172 while n
> 0 and state
== "ready" and not err
do
174 -- Loop through queue items, and attempt to run them
176 local queued_input
= q
[i
];
177 local ok
, new_state
= coroutine
.resume(thread
, queued_input
);
179 -- There was an error running the coroutine, save the error, mark runner as ready to begin again
180 consumed
, state
, err
= i
, "ready", debug
.traceback(thread
, new_state
);
183 elseif new_state
== "wait" then
184 -- Runner is blocked on waiting for a task item to complete
185 consumed
, state
= i
, "waiting";
189 -- Loop ended - either queue empty because all tasks passed without blocking (consumed == nil)
190 -- or runner is blocked/errored, and consumed will contain the number of tasks processed so far
191 if not consumed
then consumed
= n
; end
192 -- Remove consumed items from the queue array
193 if q
[n
+1] ~= nil then
197 q
[i
] = q
[consumed
+i
];
201 -- Runner processed all items it can, so save current runner state
203 if err
or state
~= self
.notified_state
then
204 self
:log("debug", "changed state from %s to %s", self
.notified_state
, err
and ("error ("..state
..")") or state
);
208 self
.notified_state
= state
;
210 local handler
= self
.watchers
[state
];
211 if handler
then handler(self
, err
); end
216 return true, state
, n
;
219 -- Add a task item to the queue without invoking the runner, even if it is idle
220 function runner_mt
:enqueue(input
)
221 table.insert(self
.queue
, input
);
222 self
:log("debug", "queued new work item, %d items queued", #self
.queue
);
226 function runner_mt
:log(level
, fmt
, ...)
227 return self
._log(level
, fmt
, ...);
230 function runner_mt
:onready(f
)
231 self
.watchers
.ready
= f
;
235 function runner_mt
:onwaiting(f
)
236 self
.watchers
.waiting
= f
;
240 function runner_mt
:onerror(f
)
241 self
.watchers
.error = f
;
245 local function ready()
246 return pcall(checkthread
);