Added test.lisp
[netclos.git] / active.lisp
blob489724b52262227e86a84282dcab4ad2551f9107
1 ;;;-----------------------------------------------------------------------------------
2 ;;; name : active.lisp
3 ;;; description: The following code is about active-objects. Active Objects are diffent from
4 ;;; normal Lisp Objets - if you send a message to them they work on it in their
5 ;;; own (conceptual) process, but always only one message at a time.
6 ;;; Depending on the mode used for sending, they may
7 ;;; do it concurrently with the calling process:
8 ;;; past: asynchronous message passing
9 ;;; future: asynchronous, but synchronisation via future objects
10 ;;; now: synchronous message passing.
11 ;;; To realize active objects, some other classes are necessary:
12 ;;; scheduler: The scheduler provides processor-time for the active objects
13 ;;; future: future objects can be returned by an asynchronous functioncall
14 ;;; instead of the result. When the result is needed, the calling process
15 ;;; does a touch on the future. This touch suspends the process until the
16 ;;; result is computed, and returns the result.
17 ;;; sending-gf: Sending-gf's are generic-functions which transform a function call
18 ;;; on an active object directly into a message. Example:
19 ;;; Be foo a sending-gf with send-func send-future. What happens when
20 ;;; foo is called on an active object? Well, first a future object
21 ;;; and then a message (something like '(foo args future-obj)) is
22 ;;; created. The message is send to the active object, and foo
23 ;;; returns the future object.
24 ;;; I also realized protected objects, which accpet only now-messages and can be
25 ;;; locked.
26 ;;; notes :
27 ;;; contact : me (Michael Trowe)
28 ;;; copyright :
29 ;;; history :
30 ;;; contents :
31 ;;;-----------------------------------------------------------------------------------
32 (in-package nc)
33 (defvar *caller* nil)
34 (defvar *current-actor* nil)
35 (defvar *get-scheduler* nil)
37 (shadow 'stream)
38 ;;; I have to define the standard scheduler class here, because I use it in an initform
39 ;;; of the active-object class.
40 (defclass standard-scheduler ()
41 ((obj-proc-pairs :initform () :accessor obj-proc-pairs)))
43 (let ((standard-scheduler (make-instance 'standard-scheduler)))
44 (setq *get-scheduler* (lambda () standard-scheduler)))
46 ;;; the classes
48 ;;; concurency-obj is useful when I later define other classes, wich are not active objects,
49 ;;; but also have some concurrency features.
50 (defclass concurrency-obj () ())
52 (defclass active-object (concurrency-obj)
53 ((queue :accessor mail-queue
54 :initform (make-instance 'lockable-queue))
55 (active-p :initform ()
56 :accessor active-p)
57 (lock :accessor obj-lock
58 :initform (acl-compat-mp:make-process-lock))
59 (scheduler :accessor scheduler :initform (funcall *get-scheduler*))
60 (waiting-pattern :initform () :accessor waiting-pattern)
61 (wait-lock :initform (acl-compat-mp:make-process-lock) :accessor wait-lock)
62 (waiting-function :accessor waiting-function)))
64 ;;; the following classes are useful mainly for low-level stuff, e.g. to implement
65 ;;; the infrastructure for the distributed object service.
66 (defclass delayed-reply-object (active-object)
67 ((pending-requests :accessor pending-requests
68 :initform ())
69 (last-id :accessor last-id
70 :initform 0)
71 (current-future :accessor current-future
72 :initform ()))
73 (:documentation "These objects can store futures, do some other stuff, e.g. delegate
74 a call, and later restore the future and set it to some value"))
77 (defclass autonomous-object ()
78 ((process :accessor process))
79 (:documentation "these objects don't need a scheduler. This is useful to implement a
80 never-ending process"))
83 ;;; auxilliary classes
84 (defclass future ()
85 ((result :accessor result)
86 (computed-p :accessor computed-p :initform ())
87 (waiting-processes :initform () :accessor waiting-processes)))
89 (defclass multi-future (future)
90 ((computed-nr :initform 0 :accessor computed-nr)
91 (nr-of-sends :initarg :nr-of-sends :accessor nr-of-sends)
92 (future-list :initarg :future-list :accessor future-list)))
94 (defclass sending-gf (standard-generic-function)
95 ((send-func :accessor send-func :initarg :send-func :initform #'send-now))
96 (:metaclass funcallable-standard-class))
97 ;;(:default-initargs :send-func #'send-now))
99 ;;; Normally, the object a message is send to, is the first of the arguments of a sending-gf.
100 ;; But when fou define setf forms, the manipulated object comes in the secons position
101 ;;; of the lambda list. When (setf (huhu active-obj) new-value) is evaluated, it expands
102 ;;; to something-like (funcall #'(setf huhu) new-value active-obj) (well, at least
103 ;;; conceptually). This means you have to do the processing of the lambda list for setf-gf a
104 ;;; bit different.
105 (defclass setf-gf (standard-generic-function)
107 (:metaclass funcallable-standard-class))
109 (defclass sending-setf (sending-gf setf-gf)
111 (:metaclass funcallable-standard-class))
113 ;;; And now comes the implementation stuff,
115 ;;;------------------------------------------------
116 ;;; here is the main part: operations on active objects
117 ;;;-------------------------------------------------
119 ;;; The three forms od sending a message to an actor.
121 (defmethod send-future ((obj active-object) caller gf-name &rest args)
122 (send obj caller gf-name args (make-future)))
124 (defmethod send-past ((obj active-object) caller gf-name &rest args)
125 (send obj caller gf-name args nil))
127 (defmethod send-now ((obj active-object) caller gf-name &rest args)
128 (touch (send obj caller gf-name args (make-future))))
130 (defmethod multicast ((objs list) gf-name &rest args)
131 (let ((future (make-multi-future (length objs))))
132 (loop for obj in objs
133 do (send obj *current-actor* gf-name args future))
134 future))
136 (defun send (obj caller gf-name args future)
137 (acl-compat-mp:with-process-lock ((obj-lock obj))
138 ;;Enqueueing (scheduling) and testing for activation must be serialized with active-test. Otherwise,
139 ;;the following scenario might occur:
140 ;;1. empty-queue-p in active-test yields true,
141 ;;2. then the new messsage is enqueued,
142 ;;3. then the test (active-p obj) yields true, so the object won't be reactivated,
143 ;;4. and then the object deactivates, though there is still a message to process.
144 (if (and (waiting-pattern obj)
145 (message-match (waiting-pattern obj)
146 (list caller gf-name args)))
147 (run-message (scheduler obj)
148 obj caller gf-name args future)
149 (schedule (scheduler obj) obj (list caller gf-name args future))))
150 future)
152 ;;; What follows is used to interprete the messages send to an actor
153 ;;; and to controll this process.
157 (defmethod active-test ((obj active-object))
158 (acl-compat-mp:with-process-lock ((obj-lock obj))
159 (if (and (empty-queue-p (mail-queue obj))
160 (not (waiting-pattern obj)))
161 (unschedule (scheduler obj) obj)
162 t)))
164 (defmethod active-body :around ((obj active-object))
165 (with-simple-restart (abort "Let active object ~a resume." obj)
166 (call-next-method)))
168 (defmethod active-body ((obj active-object))
170 (when (equal *local-host* "ki18")
171 (format t "Active-body von ~S mit stream ~S" obj (stream obj)))
173 (let ((message (dequeue (mail-queue obj))))
174 (when message
175 (apply-message obj
176 (first message)
177 (second message)
178 (third message)
179 (fourth message)))))
181 (defmethod apply-message ((*current-actor* active-object) *caller* message-name args future)
182 (call-gf *current-actor*
183 (fdefinition message-name)
184 (if (listp message-name)
185 (cons (car args) (cons *current-actor* (cdr args)))
186 (cons *current-actor* args))
187 future))
189 (defmethod call-gf ((obj active-object) (method function) args future)
190 (cond (future
191 (write-to-future future (restart-case (apply method args)
192 (nil (result)
193 :report
194 (lambda (stream)
195 (format stream
196 "Provide a value for the future."))
197 :interactive
198 (lambda ()
199 (format t "Future-value: ")
200 (list (eval (read))))
201 result))))
202 (t (apply method args))))
205 (defmethod call-gf ((obj delayed-reply-object) (method function) args future)
206 (cond (future
207 (setf (current-future obj) future)
208 (let ((result (apply method args)))
209 (when (current-future obj)
210 (write-to-future future result))))
211 (t (apply method args)))
212 (setf (current-future obj) nil))
216 (defmethod store-future ((obj delayed-reply-object))
217 (let ((id (next-message-id obj)))
218 (push (cons id (current-future obj))
219 (pending-requests obj))
220 (setf (current-future obj) nil)
221 id))
223 (defmethod get-future ((obj delayed-reply-object) id)
224 (prog1 (cdr (assoc id (pending-requests obj)))
225 (setf (pending-requests obj)
226 (delete id (pending-requests obj) :key #'car))))
228 (defmethod next-message-id ((obj active-object))
229 (setf (last-id obj) (mod (1+ (last-id obj)) 65536)))
234 (defmethod initialize-instance :after ((inst autonomous-object) &rest initargs)
235 (declare (ignore initargs))
236 (setf (process inst) (acl-compat-mp:make-process :name (string (class-name (class-of inst)))
237 :quantum 0.2))
238 (acl-compat-mp:process-preset (process inst) #'active-loop inst)
239 (acl-compat-mp:process-enable (process inst)))
241 (defmacro wait-for ((message-name &rest args) &optional from (caller '?))
242 (declare (ignore from))
243 `(active-object-wait (scheduler *current-actor*)
244 *current-actor*
245 ,(if (eq caller '?)
247 caller)
248 ',message-name
249 (list ,@(loop for arg in args
250 collect (if (eq arg '?)
252 arg)))))
254 (defun message-match (pattern message)
255 (and (or (eq (first pattern) '?)
256 (eq (first pattern) (first message)))
257 (or (eq (second pattern) '?)
258 (eq (second pattern) (second message)))
259 (loop for x in (third pattern)
260 for y in (third message)
261 always (or (eq x '?)
262 (equal x y)))))
265 (defmethod kill ((obj active-object))
266 (unschedule (scheduler obj) obj))
268 (defmethod kill ((obj autonomous-object))
269 (acl-compat-mp:process-kill (process obj)))
271 (defmethod reset :before ((obj autonomous-object))
272 (acl-compat-mp:process-reset (process obj)))
274 (defmethod reset :after ((obj autonomous-object))
275 (acl-compat-mp:process-preset (process obj) #'active-loop obj))
277 (defmethod reset :before ((obj active-object))
278 (unschedule (scheduler obj) obj))
280 (defmethod reset :after ((obj active-object))
281 (schedule (scheduler obj) obj ()))
283 ;;;---------------------------------------------------------------
284 ;;; Now the auxilliary stuff: 1. futures
285 ;;;--------------------------------------------------------------
286 (defun make-future ()
287 (make-instance 'future))
289 (defun make-multi-future (length)
290 (make-instance 'multi-future
291 :nr-of-sends length
292 :future-list (loop repeat length collect (make-future))))
294 (defmethod touch ((obj t))
295 obj)
297 (defmethod computed-p ((obj t))
301 (defmethod touch ((fut future))
302 (acl-compat-mp:without-interrupts ; FIXME was excl:without-interrupts
303 (unless (computed-p fut)
304 (push acl-compat-mp:*current-process* (waiting-processes fut)) ; FIXME was sys:*current-process*
305 (acl-compat-mp:process-disable acl-compat-mp:*current-process*)))
306 (result fut))
308 (defmethod maptouch (func (mfut multi-future))
309 (loop for fut in (future-list mfut)
310 collect (funcall func (touch fut))))
312 (defmethod write-to-future ((fut future) result)
313 (acl-compat-mp:without-interrupts
314 (setf (computed-p fut) t)
315 (setf (result fut) result))
316 (loop for proc in (waiting-processes fut)
317 do (acl-compat-mp:process-enable proc)))
319 (defmethod write-to-future ((mfut multi-future) result)
320 (acl-compat-mp:without-interrupts
321 (write-to-future (nth (computed-nr mfut) (future-list mfut)) result)
322 (incf (computed-nr mfut))
323 (when (= (computed-nr mfut) (nr-of-sends mfut))
324 (setf (computed-p mfut) t)
325 (setf (result mfut) (mapcar #'result (future-list mfut)))
326 (loop for proc in (waiting-processes mfut)
327 do (acl-compat-mp:process-enable proc)))))
330 ;;;; an alternative implementation.
332 (defun make-future ()
333 (let ((fut (make-instance 'future)))
334 (mp:process-lock (future-lock future) 0)
335 fut))
337 (defmethod touch ((fut future))
338 (mp:process-lock (future-lock fut) 1)
339 (mp:process-unlock (future-lock fut) 1)
340 (result future))
342 (defmethod write-to-future ((fut future) result)
343 (setf (result future) result)
344 (mp:process-unlock (future-lock fut) 0))
347 ;;;-----------------------------------------------
348 ;;; 2. schedulers
349 ;;;-----------------------------------------------
350 (defmethod schedule ((sched standard-scheduler) obj message)
351 (enqueue (mail-queue obj) message)
352 (unless (active-p obj)
353 (setf (active-p obj) t)
354 (acl-compat-mp:process-run-function (list :name (symbol-name (class-name (class-of obj)))
355 :quantum 0.2)
356 #'active-loop obj)))
358 (defmethod unschedule ((sched standard-scheduler) obj)
359 (setf (active-p obj) nil))
362 (defmethod unschedule ((sched standard-scheduler) obj)
363 (setf (obj-proc-pairs sched)
364 (loop for obj-procs on (obj-proc-pairs sched)
365 until (eq (caar obj-procs) obj)
366 collect (car obj-procs) into new-obj-proc-list
367 finally (progn (when obj-procs
368 (mp:process-kill (cdar obj-procs)))
369 (return (append new-obj-proc-list
370 (cdr obj-procs))))))
371 (setf (active-p obj) nil))
373 (defmethod active-loop ((obj active-object))
374 (loop while (active-test obj)
375 do (active-body obj)))
377 (defmethod active-object-wait ((sched standard-scheduler) obj caller message-name args)
378 (let ((message (find-and-delete (mail-queue obj) (list caller message-name args)
379 :test #'message-match)))
380 (if message
381 (apply #'apply-message obj message)
382 (progn (acl-compat-mp:with-process-lock ((obj-lock obj))
383 (setf (waiting-pattern obj) (list caller message-name args)))
384 (acl-compat-mp:process-lock (wait-lock obj) 0)
385 (acl-compat-mp:process-lock (wait-lock obj) 1)
386 (acl-compat-mp:process-unlock (wait-lock obj) 1)
387 (funcall (waiting-function obj))))))
390 (defmethod run-message ((sched standard-scheduler) obj caller message-name args future)
391 (setf (waiting-pattern obj) ())
392 (setf (waiting-function obj) #'(lambda ()
393 (apply-message obj
394 caller
395 message-name
396 args
397 future)))
398 (acl-compat-mp:process-unlock (wait-lock obj) 0))
400 ;;;-----------------------------------------
401 ;;; 3. sending-gf
402 ;;;-----------------------------------------
404 (define-method-combination sending ()
405 ((test (:test))
406 (send (:send))
407 (around (:around))
408 (before (:before))
409 (primary ())
410 (after (:after)))
411 (unless (or test primary)
412 (method-combination-error "A primary method is required"))
413 (flet ((call-methods (methods)
414 (mapcar #'(lambda (method) `(call-method ,method ())) methods)))
415 (let* ((prime-form (if (or before after (rest primary))
416 `(multiple-value-prog1
417 (progn ,@(call-methods before)
418 (call-method ,(first primary)
419 ,(rest primary))
420 ,@(call-methods (reverse after))))
421 `(call-method ,(first primary) ())))
422 (form (cond ((not primary) '(error "A primary method is required"))
423 (around `(call-method ,(first around)
424 (,@(rest around)
425 (make-method ,prime-form))))
426 (t prime-form))))
428 (if (and test send)
429 `(if (call-method ,(first test) ())
430 (call-method ,(first send) ())
431 ,form)
432 form))))
434 ;;; this is what setf-gfs are good for: To provide a generic interface to the
435 ;;; concurrency-obj in an lambda-list, argument-list etc.
436 (defmethod get-concurrency-obj ((gf standard-generic-function) args)
437 (car args))
439 (defmethod without-concurrency-obj ((gf standard-generic-function) args)
440 (cdr args))
442 (defmethod unite-concurrency-obj-and-others ((gf standard-generic-function) obj args)
443 (cons obj args))
446 (defmethod get-concurrency-obj ((gf setf-gf) args)
447 (cadr args))
449 (defmethod without-concurrency-obj ((gf setf-gf) args)
450 (cons (car args) (cddr args)))
452 (defmethod unite-concurrency-obj-and-others ((gf setf-gf) obj args)
453 (cons (car args) (cons obj (cdr args))))
456 ;;; sending-gfs have always the message-combination of type sending
457 ;;; and have always a test and a send method.
458 (defmethod initialize-instance :after ((gf sending-gf) &rest initargs)
459 (declare (ignore initargs))
460 (setf (generic-function-method-combination gf) (find-method-combination gf 'sending ()))
461 (add-method gf
462 (make-instance 'standard-method
463 :function (compile nil (test-lambda gf))
464 :specializers (compute-specializers gf)
465 :qualifiers '(:test)
466 :lambda-list (generic-function-lambda-list gf)))
467 (add-method gf
468 (make-instance 'standard-method
469 :function (compile nil (send-lambda gf))
470 :specializers (compute-specializers gf)
471 :qualifiers '(:send)
472 :lambda-list (generic-function-lambda-list gf))))
474 (defmethod reinitialize-instance :after ((gf sending-gf) &rest initargs)
475 (declare (ignore initargs))
476 (setf (generic-function-method-combination gf) (find-method-combination gf 'sending ()))
477 (unless (slot-boundp gf 'send-func)
478 (setf (send-func gf) #'send-now))
479 (add-method gf
480 (make-instance 'standard-method
481 :function (compile nil (test-lambda gf))
482 :specializers (compute-specializers gf)
483 :qualifiers '(:test)
484 :lambda-list (generic-function-lambda-list gf)))
485 (add-method gf
486 (make-instance 'standard-method
487 :function (compile nil (send-lambda gf))
488 :specializers (compute-specializers gf)
489 :qualifiers '(:send)
490 :lambda-list (generic-function-lambda-list gf))))
492 (defun test-lambda (gf)
493 (if (listp (generic-function-name gf))
494 '(lambda (new obj &rest rest)
495 (declare (ignore new rest))
496 (not (eq *current-actor* obj)))
497 '(lambda (obj &rest rest)
498 (declare (ignore rest))
499 (not (eq *current-actor* obj)))))
501 (defun proxy-test-lambda (gf)
502 (declare (ignore gf))
503 '(lambda (&rest rest)
504 (declare (ignore rest))
508 (defun send-lambda (gf)
509 (if (listp (generic-function-name gf))
510 `(lambda (new obj &rest rest)
511 (apply ,(send-func gf)
513 *current-actor*
514 ',(generic-function-name gf)
516 rest))
517 `(lambda (obj &rest rest)
518 (apply ,(send-func gf)
520 *current-actor*
521 ',(generic-function-name gf)
522 rest))))
524 (defun compute-specializers (gf &optional (specializer-name 'concurrency-obj))
525 (unite-concurrency-obj-and-others gf
526 (find-class specializer-name)
527 (make-list (- (length (required-arguments gf)) 1)
528 :initial-element (find-class t))))
531 (defun required-arguments (gf)
532 (loop for req in (generic-function-lambda-list gf)
533 while (not (member req lambda-list-keywords))
534 collect req))
537 ;;; protected objects are useful to enhance performance. You don't need to use an active object
538 ;;; to implement a shared resource. They accept now-messages one-at-a-time (as it should be)
539 ;;; and can be locked.
541 (defclass protected-obj (concurrency-obj)
542 ((obj-lock :accessor obj-lock
543 :initform (acl-compat-mp:make-process-lock))
544 (locker :accessor locker :initform :none)))
546 (defmethod send-now ((*current-actor* protected-obj) *caller* message-name &rest args)
547 (let ((f (fdefinition message-name)))
548 (if (eq *caller* (locker *current-actor*))
549 (apply f (if (listp message-name)
550 (cons (car args) (cons *current-actor* (cdr args)))
551 (cons *current-actor* args)))
552 (unwind-protect (progn
553 (acl-compat-mp:process-lock (obj-lock *current-actor*) *caller*)
554 (apply f (if (listp message-name)
555 (cons (car args) (cons *current-actor* (cdr args)))
556 (cons *current-actor* args))))
557 (acl-compat-mp:process-unlock (obj-lock *current-actor*) *caller*)))))
561 (defmethod lock ((obj protected-obj))
562 (acl-compat-mp:process-lock (obj-lock obj) *current-actor*)
563 (setf (locker obj) *current-actor*))
566 (defmethod unlock ((obj protected-obj))
567 (setf (locker obj) :none)
568 (acl-compat-mp:process-unlock (obj-lock obj) *current-actor*))
571 #| here are some examples to test active objects.
573 a 1 item buffer:
575 (defclass buffer (ncl-object)
576 ((item :accessor buffer-item :initform ()
577 :moving-behavior :follow)))
579 (defpargeneric put-item :past (buffer item))
581 (defpargeneric get-item :future (buffer))
585 (defmethod put-item ((buf buffer) item)
586 (when (buffer-item buf)
587 (wait-for (get-item)))
588 (print (setf (buffer-item buf) item)))
590 (defmethod get-item ((buf buffer))
591 (unless (buffer-item buf)
592 (wait-for (put-item ?)))
593 (prog1 (print (buffer-item buf))
594 (setf (buffer-item buf) ())))
597 an example for protected objects. Be x = y then change doubles both.
598 but the unlocked change used by concurrent actors may lead to a situation like:
599 2 actors, x = y = 1.
600 after change by both actors: x = 3 , y = 4
602 (defvar *time* 10)
603 (defclass changer (active-object)
606 (defclass protected-var (protected-obj)
607 ((x :accessor get-x :initarg :x)
608 (y :accessor get-y :initarg :y)))
610 (defpargeneric get-x :now (var))
611 (defpargeneric get-y :now (var))
612 (defpargeneric (setf get-x) :now (new var))
613 (defpargeneric (setf get-y) :now (new var))
615 (defpargeneric change :past (changer var))
616 (defpargeneric locked-change :past (changer var))
618 (defmethod change ((changer changer) var)
619 (let ((x (get-x var))
620 (y (get-y var))
621 (time *time*))
622 (if (= time 0)
623 (setf *time* 10)
624 (setf *time* 0))
625 (setf (get-x var) (+ x y))
626 (sleep time)
627 (setf (get-y var) (+ x Y))))
629 (defmethod locked-change ((changer changer) var)
630 (lock var)
631 (let ((x (get-x var))
632 (y (get-y var))
633 (time *time*))
634 (if (= time 0)
635 (setf *time* 10)
636 (setf *time* 0))
637 (setf (get-x var) (+ x y))
638 (sleep time)
639 (setf (get-y var) (+ x y)))
640 (unlock var))
644 (setf c1 (make-instance 'changer))
645 (setf c2 (make-instance 'changer))
646 (setf v (make-instance 'protected-var :x 1 :y 1))
647 (change c1 v)
648 (change c2 v)
649 (locked-change c1 v)
650 (locked-change c2 v)
653 An example for a multicast
654 (defclass multi-tester (active-object) ((sleep-time :initarg :sleep-time :accessor sleep-time)))
656 (defpargeneric get-st :future (mt))
658 (defmethod get-st ((mt multi-tester)) (sleep (sleep-time mt)) (sleep-time mt))
660 (defun m-test (nr) (maptouch #'print (multicast (loop for i from nr downto 1
661 collect (make-instance 'multi-tester
662 :sleep-time i))
663 'get-st)))
664 (m-test 5)
667 ;;; the end.
668 ;;;------------------------------------------------------