Merge 0.10->0.11
[prosody.git] / spec / util_async_spec.lua
blobd2de8c94fdbc72b8282e1d5526506cad2b400ca4
1 local async = require "util.async";
3 describe("util.async", function()
4 local debug = false;
5 local print = print;
6 if debug then
7 require "util.logger".add_simple_sink(print);
8 else
9 print = function () end
10 end
12 local function mock_watchers(event_log)
13 local function generic_logging_watcher(name)
14 return function (...)
15 table.insert(event_log, { name = name, n = select("#", ...)-1, select(2, ...) });
16 end;
17 end;
18 return setmetatable(mock{
19 ready = generic_logging_watcher("ready");
20 waiting = generic_logging_watcher("waiting");
21 error = generic_logging_watcher("error");
22 }, {
23 __index = function (_, event)
24 -- Unexpected watcher called
25 assert(false, "unexpected watcher called: "..event);
26 end;
28 end
30 local function new(func)
31 local event_log = {};
32 local spy_func = spy.new(func);
33 return async.runner(spy_func, mock_watchers(event_log)), spy_func, event_log;
34 end
35 describe("#runner", function()
36 it("should work", function()
37 local r = new(function (item) assert(type(item) == "number") end);
38 r:run(1);
39 r:run(2);
40 end);
42 it("should be ready after creation", function ()
43 local r = new(function () end);
44 assert.equal(r.state, "ready");
45 end);
47 it("should do nothing if the queue is empty", function ()
48 local did_run;
49 local r = new(function () did_run = true end);
50 r:run();
51 assert.equal(r.state, "ready");
52 assert.is_nil(did_run);
53 r:run("hello");
54 assert.is_true(did_run);
55 end);
57 it("should support queuing work items without running", function ()
58 local did_run;
59 local r = new(function () did_run = true end);
60 r:enqueue("hello");
61 assert.equal(r.state, "ready");
62 assert.is_nil(did_run);
63 r:run();
64 assert.is_true(did_run);
65 end);
67 it("should support queuing multiple work items", function ()
68 local last_item;
69 local r, s = new(function (item) last_item = item; end);
70 r:enqueue("hello");
71 r:enqueue("there");
72 r:enqueue("world");
73 assert.equal(r.state, "ready");
74 r:run();
75 assert.equal(r.state, "ready");
76 assert.spy(s).was.called(3);
77 assert.equal(last_item, "world");
78 end);
80 it("should support all simple data types", function ()
81 local last_item;
82 local r, s = new(function (item) last_item = item; end);
83 local values = { {}, 123, "hello", true, false };
84 for i = 1, #values do
85 r:enqueue(values[i]);
86 end
87 assert.equal(r.state, "ready");
88 r:run();
89 assert.equal(r.state, "ready");
90 assert.spy(s).was.called(#values);
91 for i = 1, #values do
92 assert.spy(s).was.called_with(values[i]);
93 end
94 assert.equal(last_item, values[#values]);
95 end);
97 it("should work with no parameters", function ()
98 local item = "fail";
99 local r = async.runner();
100 local f = spy.new(function () item = "success"; end);
101 r:run(f);
102 assert.spy(f).was.called();
103 assert.equal(item, "success");
104 end);
106 it("supports a default error handler", function ()
107 local item = "fail";
108 local r = async.runner();
109 local f = spy.new(function () error("test error"); end);
110 assert.error_matches(function ()
111 r:run(f);
112 end, "test error");
113 assert.spy(f).was.called();
114 assert.equal(item, "fail");
115 end);
117 describe("#errors", function ()
118 describe("should notify", function ()
119 local last_processed_item, last_error;
120 local r;
121 r = async.runner(function (item)
122 if item == "error" then
123 error({ e = "test error" });
125 last_processed_item = item;
126 end, mock{
127 ready = function () end;
128 waiting = function () end;
129 error = function (runner, err)
130 assert.equal(r, runner);
131 last_error = err;
132 end;
135 -- Simple item, no error
136 r:run("hello");
137 assert.equal(r.state, "ready");
138 assert.equal(last_processed_item, "hello");
139 assert.spy(r.watchers.ready).was_not.called();
140 assert.spy(r.watchers.error).was_not.called();
142 -- Trigger an error inside the runner
143 assert.equal(last_error, nil);
144 r:run("error");
145 test("the correct watcher functions", function ()
146 -- Only the error watcher should have been called
147 assert.spy(r.watchers.ready).was_not.called();
148 assert.spy(r.watchers.waiting).was_not.called();
149 assert.spy(r.watchers.error).was.called(1);
150 end);
151 test("with the correct error", function ()
152 -- The error watcher state should be correct, to
153 -- demonstrate the error was passed correctly
154 assert.is_table(last_error);
155 assert.equal(last_error.e, "test error");
156 last_error = nil;
157 end);
158 assert.equal(r.state, "ready");
159 assert.equal(last_processed_item, "hello");
160 end);
163 local last_processed_item, last_error;
164 local r;
165 local wait, done;
166 r = async.runner(function (item)
167 if item == "error" then
168 error({ e = "test error" });
169 elseif item == "wait" then
170 wait, done = async.waiter();
171 wait();
172 error({ e = "post wait error" });
174 last_processed_item = item;
175 end, mock({
176 ready = function () end;
177 waiting = function () end;
178 error = function (runner, err)
179 assert.equal(r, runner);
180 last_error = err;
181 end;
182 }));
184 randomize(false); --luacheck: ignore 113/randomize
186 it("should not be fatal to the runner", function ()
187 r:run("world");
188 assert.equal(r.state, "ready");
189 assert.spy(r.watchers.ready).was_not.called();
190 assert.equal(last_processed_item, "world");
191 end);
192 it("should work despite a #waiter", function ()
193 -- This test covers an important case where a runner
194 -- throws an error while being executed outside of the
195 -- main loop. This happens when it was blocked ('waiting'),
196 -- and then released (via a call to done()).
197 last_error = nil;
198 r:run("wait");
199 assert.equal(r.state, "waiting");
200 assert.spy(r.watchers.waiting).was.called(1);
201 done();
202 -- At this point an error happens (state goes error->ready)
203 assert.equal(r.state, "ready");
204 assert.spy(r.watchers.error).was.called(1);
205 assert.spy(r.watchers.ready).was.called(1);
206 assert.is_table(last_error);
207 assert.equal(last_error.e, "post wait error");
208 last_error = nil;
209 r:run("hello again");
210 assert.spy(r.watchers.ready).was.called(1);
211 assert.spy(r.watchers.waiting).was.called(1);
212 assert.spy(r.watchers.error).was.called(1);
213 assert.equal(r.state, "ready");
214 assert.equal(last_processed_item, "hello again");
215 end);
218 it("should continue to process work items", function ()
219 local last_item;
220 local runner, runner_func = new(function (item)
221 if item == "error" then
222 error("test error");
224 last_item = item;
225 end);
226 runner:enqueue("one");
227 runner:enqueue("error");
228 runner:enqueue("two");
229 runner:run();
230 assert.equal(runner.state, "ready");
231 assert.spy(runner_func).was.called(3);
232 assert.spy(runner.watchers.error).was.called(1);
233 assert.spy(runner.watchers.ready).was.called(0);
234 assert.spy(runner.watchers.waiting).was.called(0);
235 assert.equal(last_item, "two");
236 end);
238 it("should continue to process work items during resume", function ()
239 local wait, done, last_item;
240 local runner, runner_func = new(function (item)
241 if item == "wait-error" then
242 wait, done = async.waiter();
243 wait();
244 error("test error");
246 last_item = item;
247 end);
248 runner:enqueue("one");
249 runner:enqueue("wait-error");
250 runner:enqueue("two");
251 runner:run();
252 done();
253 assert.equal(runner.state, "ready");
254 assert.spy(runner_func).was.called(3);
255 assert.spy(runner.watchers.error).was.called(1);
256 assert.spy(runner.watchers.waiting).was.called(1);
257 assert.spy(runner.watchers.ready).was.called(1);
258 assert.equal(last_item, "two");
259 end);
260 end);
261 end);
262 describe("#waiter", function()
263 it("should error outside of async context", function ()
264 assert.has_error(function ()
265 async.waiter();
266 end);
267 end);
268 it("should work", function ()
269 local wait, done;
271 local r = new(function (item)
272 assert(type(item) == "number")
273 if item == 3 then
274 wait, done = async.waiter();
275 wait();
277 end);
279 r:run(1);
280 assert(r.state == "ready");
281 r:run(2);
282 assert(r.state == "ready");
283 r:run(3);
284 assert(r.state == "waiting");
285 done();
286 assert(r.state == "ready");
287 --for k, v in ipairs(l) do print(k,v) end
288 end);
290 it("should work", function ()
291 --------------------
292 local wait, done;
293 local last_item = 0;
294 local r = new(function (item)
295 assert(type(item) == "number")
296 assert(item == last_item + 1);
297 last_item = item;
298 if item == 3 then
299 wait, done = async.waiter();
300 wait();
302 end);
304 r:run(1);
305 assert(r.state == "ready");
306 r:run(2);
307 assert(r.state == "ready");
308 r:run(3);
309 assert(r.state == "waiting");
310 r:run(4);
311 assert(r.state == "waiting");
312 done();
313 assert(r.state == "ready");
314 --for k, v in ipairs(l) do print(k,v) end
315 end);
316 it("should work", function ()
317 --------------------
318 local wait, done;
319 local last_item = 0;
320 local r = new(function (item)
321 assert(type(item) == "number")
322 assert((item == last_item + 1) or item == 3);
323 last_item = item;
324 if item == 3 then
325 wait, done = async.waiter();
326 wait();
328 end);
330 r:run(1);
331 assert(r.state == "ready");
332 r:run(2);
333 assert(r.state == "ready");
335 r:run(3);
336 assert(r.state == "waiting");
337 r:run(3);
338 assert(r.state == "waiting");
339 r:run(3);
340 assert(r.state == "waiting");
341 r:run(4);
342 assert(r.state == "waiting");
344 for i = 1, 3 do
345 done();
346 if i < 3 then
347 assert(r.state == "waiting");
351 assert(r.state == "ready");
352 --for k, v in ipairs(l) do print(k,v) end
353 end);
354 it("should work", function ()
355 --------------------
356 local wait, done;
357 local last_item = 0;
358 local r = new(function (item)
359 assert(type(item) == "number")
360 assert((item == last_item + 1) or item == 3);
361 last_item = item;
362 if item == 3 then
363 wait, done = async.waiter();
364 wait();
366 end);
368 r:run(1);
369 assert(r.state == "ready");
370 r:run(2);
371 assert(r.state == "ready");
373 r:run(3);
374 assert(r.state == "waiting");
375 r:run(3);
376 assert(r.state == "waiting");
378 for i = 1, 2 do
379 done();
380 if i < 2 then
381 assert(r.state == "waiting");
385 assert(r.state == "ready");
386 r:run(4);
387 assert(r.state == "ready");
389 assert(r.state == "ready");
390 --for k, v in ipairs(l) do print(k,v) end
391 end);
392 it("should work with multiple runners in parallel", function ()
393 -- Now with multiple runners
394 --------------------
395 local wait1, done1;
396 local last_item1 = 0;
397 local r1 = new(function (item)
398 assert(type(item) == "number")
399 assert((item == last_item1 + 1) or item == 3);
400 last_item1 = item;
401 if item == 3 then
402 wait1, done1 = async.waiter();
403 wait1();
405 end, "r1");
407 local wait2, done2;
408 local last_item2 = 0;
409 local r2 = new(function (item)
410 assert(type(item) == "number")
411 assert((item == last_item2 + 1) or item == 3);
412 last_item2 = item;
413 if item == 3 then
414 wait2, done2 = async.waiter();
415 wait2();
417 end, "r2");
419 r1:run(1);
420 assert(r1.state == "ready");
421 r1:run(2);
422 assert(r1.state == "ready");
424 r1:run(3);
425 assert(r1.state == "waiting");
426 r1:run(3);
427 assert(r1.state == "waiting");
429 r2:run(1);
430 assert(r1.state == "waiting");
431 assert(r2.state == "ready");
433 r2:run(2);
434 assert(r1.state == "waiting");
435 assert(r2.state == "ready");
437 r2:run(3);
438 assert(r1.state == "waiting");
439 assert(r2.state == "waiting");
440 done2();
442 r2:run(3);
443 assert(r1.state == "waiting");
444 assert(r2.state == "waiting");
445 done2();
447 r2:run(4);
448 assert(r1.state == "waiting");
449 assert(r2.state == "ready");
451 for i = 1, 2 do
452 done1();
453 if i < 2 then
454 assert(r1.state == "waiting");
458 assert(r1.state == "ready");
459 r1:run(4);
460 assert(r1.state == "ready");
462 assert(r1.state == "ready");
463 --for k, v in ipairs(l1) do print(k,v) end
464 end);
465 it("should work work with multiple runners in parallel", function ()
466 --------------------
467 local wait1, done1;
468 local last_item1 = 0;
469 local r1 = new(function (item)
470 print("r1 processing ", item);
471 assert(type(item) == "number")
472 assert((item == last_item1 + 1) or item == 3);
473 last_item1 = item;
474 if item == 3 then
475 wait1, done1 = async.waiter();
476 wait1();
478 end, "r1");
480 local wait2, done2;
481 local last_item2 = 0;
482 local r2 = new(function (item)
483 print("r2 processing ", item);
484 assert.is_number(item);
485 assert((item == last_item2 + 1) or item == 3);
486 last_item2 = item;
487 if item == 3 then
488 wait2, done2 = async.waiter();
489 wait2();
491 end, "r2");
493 r1:run(1);
494 assert.equal(r1.state, "ready");
495 r1:run(2);
496 assert.equal(r1.state, "ready");
498 r1:run(5);
499 assert.equal(r1.state, "ready");
501 r1:run(3);
502 assert.equal(r1.state, "waiting");
503 r1:run(5); -- Will error, when we get to it
504 assert.equal(r1.state, "waiting");
505 done1();
506 assert.equal(r1.state, "ready");
507 r1:run(3);
508 assert.equal(r1.state, "waiting");
510 r2:run(1);
511 assert.equal(r1.state, "waiting");
512 assert.equal(r2.state, "ready");
514 r2:run(2);
515 assert.equal(r1.state, "waiting");
516 assert.equal(r2.state, "ready");
518 r2:run(3);
519 assert.equal(r1.state, "waiting");
520 assert.equal(r2.state, "waiting");
522 done2();
523 assert.equal(r1.state, "waiting");
524 assert.equal(r2.state, "ready");
526 r2:run(3);
527 assert.equal(r1.state, "waiting");
528 assert.equal(r2.state, "waiting");
530 done2();
531 assert.equal(r1.state, "waiting");
532 assert.equal(r2.state, "ready");
534 r2:run(4);
535 assert.equal(r1.state, "waiting");
536 assert.equal(r2.state, "ready");
538 done1();
540 assert.equal(r1.state, "ready");
541 r1:run(4);
542 assert.equal(r1.state, "ready");
544 assert.equal(r1.state, "ready");
545 end);
547 it("should support multiple done() calls", function ()
548 local processed_item;
549 local wait, done;
550 local r, rf = new(function (item)
551 wait, done = async.waiter(4);
552 wait();
553 processed_item = item;
554 end);
555 r:run("test");
556 for _ = 1, 3 do
557 done();
558 assert.equal(r.state, "waiting");
559 assert.is_nil(processed_item);
561 done();
562 assert.equal(r.state, "ready");
563 assert.equal(processed_item, "test");
564 assert.spy(r.watchers.error).was_not.called();
565 end);
567 it("should not allow done() to be called more than specified", function ()
568 local processed_item;
569 local wait, done;
570 local r, rf = new(function (item)
571 wait, done = async.waiter(4);
572 wait();
573 processed_item = item;
574 end);
575 r:run("test");
576 for _ = 1, 4 do
577 done();
579 assert.has_error(done);
580 assert.equal(r.state, "ready");
581 assert.equal(processed_item, "test");
582 assert.spy(r.watchers.error).was_not.called();
583 end);
585 it("should allow done() to be called before wait()", function ()
586 local processed_item;
587 local r, rf = new(function (item)
588 local wait, done = async.waiter();
589 done();
590 wait();
591 processed_item = item;
592 end);
593 r:run("test");
594 assert.equal(processed_item, "test");
595 assert.equal(r.state, "ready");
596 -- Since the observable state did not change,
597 -- the watchers should not have been called
598 assert.spy(r.watchers.waiting).was_not.called();
599 assert.spy(r.watchers.ready).was_not.called();
600 end);
601 end);
603 describe("#ready()", function ()
604 it("should return false outside an async context", function ()
605 assert.falsy(async.ready());
606 end);
607 it("should return true inside an async context", function ()
608 local r = new(function ()
609 assert.truthy(async.ready());
610 end);
611 r:run(true);
612 assert.spy(r.func).was.called();
613 assert.spy(r.watchers.error).was_not.called();
614 end);
615 end);
616 end);