1 ;;;-----------------------------------------------------------------------------------
3 ;;; description: The following code is about active-objects. Active Objects are diffent from
4 ;;; normal Lisp Objets - if you send a message to them they work on it in their
5 ;;; own (conceptual) process, but always only one message at a time.
6 ;;; Depending on the mode used for sending, they may
7 ;;; do it concurrently with the calling process:
8 ;;; past: asynchronous message passing
9 ;;; future: asynchronous, but synchronisation via future objects
10 ;;; now: synchronous message passing.
11 ;;; To realize active objects, some other classes are necessary:
12 ;;; scheduler: The scheduler provides processor-time for the active objects
13 ;;; future: future objects can be returned by an asynchronous functioncall
14 ;;; instead of the result. When the result is needed, the calling process
15 ;;; does a touch on the future. This touch suspends the process until the
16 ;;; result is computed, and returns the result.
17 ;;; sending-gf: Sending-gf's are generic-functions which transform a function call
18 ;;; on an active object directly into a message. Example:
19 ;;; Be foo a sending-gf with send-func send-future. What happens when
20 ;;; foo is called on an active object? Well, first a future object
21 ;;; and then a message (something like '(foo args future-obj)) is
22 ;;; created. The message is send to the active object, and foo
23 ;;; returns the future object.
24 ;;; I also realized protected objects, which accpet only now-messages and can be
27 ;;; contact : me (Michael Trowe)
31 ;;;-----------------------------------------------------------------------------------
34 (defvar *current-actor
* nil
)
35 (defvar *get-scheduler
* nil
)
38 ;;; I have to define the standard scheduler class here, because I use it in an initform
39 ;;; of the active-object class.
40 (defclass standard-scheduler
()
41 ((obj-proc-pairs :initform
() :accessor obj-proc-pairs
)))
43 (let ((standard-scheduler (make-instance 'standard-scheduler
)))
44 (setq *get-scheduler
* (lambda () standard-scheduler
)))
48 ;;; concurency-obj is useful when I later define other classes, wich are not active objects,
49 ;;; but also have some concurrency features.
50 (defclass concurrency-obj
() ())
52 (defclass active-object
(concurrency-obj)
53 ((queue :accessor mail-queue
54 :initform
(make-instance 'lockable-queue
))
55 (active-p :initform
()
57 (lock :accessor obj-lock
58 :initform
(acl-compat-mp:make-process-lock
))
59 (scheduler :accessor scheduler
:initform
(funcall *get-scheduler
*))
60 (waiting-pattern :initform
() :accessor waiting-pattern
)
61 (wait-lock :initform
(acl-compat-mp:make-process-lock
) :accessor wait-lock
)
62 (waiting-function :accessor waiting-function
)))
64 ;;; the following classes are useful mainly for low-level stuff, e.g. to implement
65 ;;; the infrastructure for the distributed object service.
66 (defclass delayed-reply-object
(active-object)
67 ((pending-requests :accessor pending-requests
69 (last-id :accessor last-id
71 (current-future :accessor current-future
73 (:documentation
"These objects can store futures, do some other stuff, e.g. delegate
74 a call, and later restore the future and set it to some value"))
77 (defclass autonomous-object
()
78 ((process :accessor process
))
79 (:documentation
"these objects don't need a scheduler. This is useful to implement a
80 never-ending process"))
83 ;;; auxilliary classes
85 ((result :accessor result
)
86 (computed-p :accessor computed-p
:initform
())
87 (waiting-processes :initform
() :accessor waiting-processes
)))
89 (defclass multi-future
(future)
90 ((computed-nr :initform
0 :accessor computed-nr
)
91 (nr-of-sends :initarg
:nr-of-sends
:accessor nr-of-sends
)
92 (future-list :initarg
:future-list
:accessor future-list
)))
94 (defclass sending-gf
(standard-generic-function)
95 ((send-func :accessor send-func
:initarg
:send-func
:initform
#'send-now
))
96 (:metaclass funcallable-standard-class
))
97 ;;(:default-initargs :send-func #'send-now))
99 ;;; Normally, the object a message is send to, is the first of the arguments of a sending-gf.
100 ;; But when fou define setf forms, the manipulated object comes in the secons position
101 ;;; of the lambda list. When (setf (huhu active-obj) new-value) is evaluated, it expands
102 ;;; to something-like (funcall #'(setf huhu) new-value active-obj) (well, at least
103 ;;; conceptually). This means you have to do the processing of the lambda list for setf-gf a
105 (defclass setf-gf
(standard-generic-function)
107 (:metaclass funcallable-standard-class
))
109 (defclass sending-setf
(sending-gf setf-gf
)
111 (:metaclass funcallable-standard-class
))
113 ;;; And now comes the implementation stuff,
115 ;;;------------------------------------------------
116 ;;; here is the main part: operations on active objects
117 ;;;-------------------------------------------------
119 ;;; The three forms od sending a message to an actor.
121 (defmethod send-future ((obj active-object
) caller gf-name
&rest args
)
122 (send obj caller gf-name args
(make-future)))
124 (defmethod send-past ((obj active-object
) caller gf-name
&rest args
)
125 (send obj caller gf-name args nil
))
127 (defmethod send-now ((obj active-object
) caller gf-name
&rest args
)
128 (touch (send obj caller gf-name args
(make-future))))
130 (defmethod multicast ((objs list
) gf-name
&rest args
)
131 (let ((future (make-multi-future (length objs
))))
132 (loop for obj in objs
133 do
(send obj
*current-actor
* gf-name args future
))
136 (defun send (obj caller gf-name args future
)
137 (acl-compat-mp:with-process-lock
((obj-lock obj
))
138 ;;Enqueueing (scheduling) and testing for activation must be serialized with active-test. Otherwise,
139 ;;the following scenario might occur:
140 ;;1. empty-queue-p in active-test yields true,
141 ;;2. then the new messsage is enqueued,
142 ;;3. then the test (active-p obj) yields true, so the object won't be reactivated,
143 ;;4. and then the object deactivates, though there is still a message to process.
144 (if (and (waiting-pattern obj
)
145 (message-match (waiting-pattern obj
)
146 (list caller gf-name args
)))
147 (run-message (scheduler obj
)
148 obj caller gf-name args future
)
149 (schedule (scheduler obj
) obj
(list caller gf-name args future
))))
152 ;;; What follows is used to interprete the messages send to an actor
153 ;;; and to controll this process.
157 (defmethod active-test ((obj active-object
))
158 (acl-compat-mp:with-process-lock
((obj-lock obj
))
159 (if (and (empty-queue-p (mail-queue obj
))
160 (not (waiting-pattern obj
)))
161 (unschedule (scheduler obj
) obj
)
164 (defmethod active-body :around
((obj active-object
))
165 (with-simple-restart (abort "Let active object ~a resume." obj
)
168 (defmethod active-body ((obj active-object
))
170 (when (equal *local-host
* "ki18")
171 (format t
"Active-body von ~S mit stream ~S" obj
(stream obj
)))
173 (let ((message (dequeue (mail-queue obj
))))
181 (defmethod apply-message ((*current-actor
* active-object
) *caller
* message-name args future
)
182 (call-gf *current-actor
*
183 (fdefinition message-name
)
184 (if (listp message-name
)
185 (cons (car args
) (cons *current-actor
* (cdr args
)))
186 (cons *current-actor
* args
))
189 (defmethod call-gf ((obj active-object
) (method function
) args future
)
191 (write-to-future future
(restart-case (apply method args
)
196 "Provide a value for the future."))
199 (format t
"Future-value: ")
200 (list (eval (read))))
202 (t (apply method args
))))
205 (defmethod call-gf ((obj delayed-reply-object
) (method function
) args future
)
207 (setf (current-future obj
) future
)
208 (let ((result (apply method args
)))
209 (when (current-future obj
)
210 (write-to-future future result
))))
211 (t (apply method args
)))
212 (setf (current-future obj
) nil
))
216 (defmethod store-future ((obj delayed-reply-object
))
217 (let ((id (next-message-id obj
)))
218 (push (cons id
(current-future obj
))
219 (pending-requests obj
))
220 (setf (current-future obj
) nil
)
223 (defmethod get-future ((obj delayed-reply-object
) id
)
224 (prog1 (cdr (assoc id
(pending-requests obj
)))
225 (setf (pending-requests obj
)
226 (delete id
(pending-requests obj
) :key
#'car
))))
228 (defmethod next-message-id ((obj active-object
))
229 (setf (last-id obj
) (mod (1+ (last-id obj
)) 65536)))
234 (defmethod initialize-instance :after
((inst autonomous-object
) &rest initargs
)
235 (declare (ignore initargs
))
236 (setf (process inst
) (acl-compat-mp:make-process
:name
(string (class-name (class-of inst
)))
238 (acl-compat-mp:process-preset
(process inst
) #'active-loop inst
)
239 (acl-compat-mp:process-enable
(process inst
)))
241 (defmacro wait-for
((message-name &rest args
) &optional from
(caller '?
))
242 (declare (ignore from
))
243 `(active-object-wait (scheduler *current-actor
*)
249 (list ,@(loop for arg in args
250 collect
(if (eq arg
'?
)
254 (defun message-match (pattern message
)
255 (and (or (eq (first pattern
) '?
)
256 (eq (first pattern
) (first message
)))
257 (or (eq (second pattern
) '?
)
258 (eq (second pattern
) (second message
)))
259 (loop for x in
(third pattern
)
260 for y in
(third message
)
265 (defmethod kill ((obj active-object
))
266 (unschedule (scheduler obj
) obj
))
268 (defmethod kill ((obj autonomous-object
))
269 (acl-compat-mp:process-kill
(process obj
)))
271 (defmethod reset :before
((obj autonomous-object
))
272 (acl-compat-mp:process-reset
(process obj
)))
274 (defmethod reset :after
((obj autonomous-object
))
275 (acl-compat-mp:process-preset
(process obj
) #'active-loop obj
))
277 (defmethod reset :before
((obj active-object
))
278 (unschedule (scheduler obj
) obj
))
280 (defmethod reset :after
((obj active-object
))
281 (schedule (scheduler obj
) obj
()))
283 ;;;---------------------------------------------------------------
284 ;;; Now the auxilliary stuff: 1. futures
285 ;;;--------------------------------------------------------------
286 (defun make-future ()
287 (make-instance 'future
))
289 (defun make-multi-future (length)
290 (make-instance 'multi-future
292 :future-list
(loop repeat length collect
(make-future))))
294 (defmethod touch ((obj t
))
297 (defmethod computed-p ((obj t
))
301 (defmethod touch ((fut future
))
302 (acl-compat-mp:without-interrupts
; FIXME was excl:without-interrupts
303 (unless (computed-p fut
)
304 (push acl-compat-mp
:*current-process
* (waiting-processes fut
)) ; FIXME was sys:*current-process*
305 (acl-compat-mp:process-disable acl-compat-mp
:*current-process
*)))
308 (defmethod maptouch (func (mfut multi-future
))
309 (loop for fut in
(future-list mfut
)
310 collect
(funcall func
(touch fut
))))
312 (defmethod write-to-future ((fut future
) result
)
313 (acl-compat-mp:without-interrupts
314 (setf (computed-p fut
) t
)
315 (setf (result fut
) result
))
316 (loop for proc in
(waiting-processes fut
)
317 do
(acl-compat-mp:process-enable proc
)))
319 (defmethod write-to-future ((mfut multi-future
) result
)
320 (acl-compat-mp:without-interrupts
321 (write-to-future (nth (computed-nr mfut
) (future-list mfut
)) result
)
322 (incf (computed-nr mfut
))
323 (when (= (computed-nr mfut
) (nr-of-sends mfut
))
324 (setf (computed-p mfut
) t
)
325 (setf (result mfut
) (mapcar #'result
(future-list mfut
)))
326 (loop for proc in
(waiting-processes mfut
)
327 do
(acl-compat-mp:process-enable proc
)))))
330 ;;;; an alternative implementation.
332 (defun make-future ()
333 (let ((fut (make-instance 'future
)))
334 (mp:process-lock
(future-lock future
) 0)
337 (defmethod touch ((fut future
))
338 (mp:process-lock
(future-lock fut
) 1)
339 (mp:process-unlock
(future-lock fut
) 1)
342 (defmethod write-to-future ((fut future
) result
)
343 (setf (result future
) result
)
344 (mp:process-unlock
(future-lock fut
) 0))
347 ;;;-----------------------------------------------
349 ;;;-----------------------------------------------
350 (defmethod schedule ((sched standard-scheduler
) obj message
)
351 (enqueue (mail-queue obj
) message
)
352 (unless (active-p obj
)
353 (setf (active-p obj
) t
)
354 (acl-compat-mp:process-run-function
(list :name
(symbol-name (class-name (class-of obj
)))
358 (defmethod unschedule ((sched standard-scheduler
) obj
)
359 (setf (active-p obj
) nil
))
362 (defmethod unschedule ((sched standard-scheduler
) obj
)
363 (setf (obj-proc-pairs sched
)
364 (loop for obj-procs on
(obj-proc-pairs sched
)
365 until
(eq (caar obj-procs
) obj
)
366 collect
(car obj-procs
) into new-obj-proc-list
367 finally
(progn (when obj-procs
368 (mp:process-kill
(cdar obj-procs
)))
369 (return (append new-obj-proc-list
371 (setf (active-p obj
) nil
))
373 (defmethod active-loop ((obj active-object
))
374 (loop while
(active-test obj
)
375 do
(active-body obj
)))
377 (defmethod active-object-wait ((sched standard-scheduler
) obj caller message-name args
)
378 (let ((message (find-and-delete (mail-queue obj
) (list caller message-name args
)
379 :test
#'message-match
)))
381 (apply #'apply-message obj message
)
382 (progn (acl-compat-mp:with-process-lock
((obj-lock obj
))
383 (setf (waiting-pattern obj
) (list caller message-name args
)))
384 (acl-compat-mp:process-lock
(wait-lock obj
) 0)
385 (acl-compat-mp:process-lock
(wait-lock obj
) 1)
386 (acl-compat-mp:process-unlock
(wait-lock obj
) 1)
387 (funcall (waiting-function obj
))))))
390 (defmethod run-message ((sched standard-scheduler
) obj caller message-name args future
)
391 (setf (waiting-pattern obj
) ())
392 (setf (waiting-function obj
) #'(lambda ()
398 (acl-compat-mp:process-unlock
(wait-lock obj
) 0))
400 ;;;-----------------------------------------
402 ;;;-----------------------------------------
404 (define-method-combination sending
()
411 (unless (or test primary
)
412 (method-combination-error "A primary method is required"))
413 (flet ((call-methods (methods)
414 (mapcar #'(lambda (method) `(call-method ,method
())) methods
)))
415 (let* ((prime-form (if (or before after
(rest primary
))
416 `(multiple-value-prog1
417 (progn ,@(call-methods before
)
418 (call-method ,(first primary
)
420 ,@(call-methods (reverse after
))))
421 `(call-method ,(first primary
) ())))
422 (form (cond ((not primary
) '(error "A primary method is required"))
423 (around `(call-method ,(first around
)
425 (make-method ,prime-form
))))
429 `(if (call-method ,(first test
) ())
430 (call-method ,(first send
) ())
434 ;;; this is what setf-gfs are good for: To provide a generic interface to the
435 ;;; concurrency-obj in an lambda-list, argument-list etc.
436 (defmethod get-concurrency-obj ((gf standard-generic-function
) args
)
439 (defmethod without-concurrency-obj ((gf standard-generic-function
) args
)
442 (defmethod unite-concurrency-obj-and-others ((gf standard-generic-function
) obj args
)
446 (defmethod get-concurrency-obj ((gf setf-gf
) args
)
449 (defmethod without-concurrency-obj ((gf setf-gf
) args
)
450 (cons (car args
) (cddr args
)))
452 (defmethod unite-concurrency-obj-and-others ((gf setf-gf
) obj args
)
453 (cons (car args
) (cons obj
(cdr args
))))
456 ;;; sending-gfs have always the message-combination of type sending
457 ;;; and have always a test and a send method.
458 (defmethod initialize-instance :after
((gf sending-gf
) &rest initargs
)
459 (declare (ignore initargs
))
460 (setf (generic-function-method-combination gf
) (find-method-combination gf
'sending
()))
462 (make-instance 'standard-method
463 :function
(compile nil
(test-lambda gf
))
464 :specializers
(compute-specializers gf
)
466 :lambda-list
(generic-function-lambda-list gf
)))
468 (make-instance 'standard-method
469 :function
(compile nil
(send-lambda gf
))
470 :specializers
(compute-specializers gf
)
472 :lambda-list
(generic-function-lambda-list gf
))))
474 (defmethod reinitialize-instance :after
((gf sending-gf
) &rest initargs
)
475 (declare (ignore initargs
))
476 (setf (generic-function-method-combination gf
) (find-method-combination gf
'sending
()))
477 (unless (slot-boundp gf
'send-func
)
478 (setf (send-func gf
) #'send-now
))
480 (make-instance 'standard-method
481 :function
(compile nil
(test-lambda gf
))
482 :specializers
(compute-specializers gf
)
484 :lambda-list
(generic-function-lambda-list gf
)))
486 (make-instance 'standard-method
487 :function
(compile nil
(send-lambda gf
))
488 :specializers
(compute-specializers gf
)
490 :lambda-list
(generic-function-lambda-list gf
))))
492 (defun test-lambda (gf)
493 (if (listp (generic-function-name gf
))
494 '(lambda (new obj
&rest rest
)
495 (declare (ignore new rest
))
496 (not (eq *current-actor
* obj
)))
497 '(lambda (obj &rest rest
)
498 (declare (ignore rest
))
499 (not (eq *current-actor
* obj
)))))
501 (defun proxy-test-lambda (gf)
502 (declare (ignore gf
))
503 '(lambda (&rest rest
)
504 (declare (ignore rest
))
508 (defun send-lambda (gf)
509 (if (listp (generic-function-name gf
))
510 `(lambda (new obj
&rest rest
)
511 (apply ,(send-func gf
)
514 ',(generic-function-name gf
)
517 `(lambda (obj &rest rest
)
518 (apply ,(send-func gf
)
521 ',(generic-function-name gf
)
524 (defun compute-specializers (gf &optional
(specializer-name 'concurrency-obj
))
525 (unite-concurrency-obj-and-others gf
526 (find-class specializer-name
)
527 (make-list (- (length (required-arguments gf
)) 1)
528 :initial-element
(find-class t
))))
531 (defun required-arguments (gf)
532 (loop for req in
(generic-function-lambda-list gf
)
533 while
(not (member req lambda-list-keywords
))
537 ;;; protected objects are useful to enhance performance. You don't need to use an active object
538 ;;; to implement a shared resource. They accept now-messages one-at-a-time (as it should be)
539 ;;; and can be locked.
541 (defclass protected-obj
(concurrency-obj)
542 ((obj-lock :accessor obj-lock
543 :initform
(acl-compat-mp:make-process-lock
))
544 (locker :accessor locker
:initform
:none
)))
546 (defmethod send-now ((*current-actor
* protected-obj
) *caller
* message-name
&rest args
)
547 (let ((f (fdefinition message-name
)))
548 (if (eq *caller
* (locker *current-actor
*))
549 (apply f
(if (listp message-name
)
550 (cons (car args
) (cons *current-actor
* (cdr args
)))
551 (cons *current-actor
* args
)))
552 (unwind-protect (progn
553 (acl-compat-mp:process-lock
(obj-lock *current-actor
*) *caller
*)
554 (apply f
(if (listp message-name
)
555 (cons (car args
) (cons *current-actor
* (cdr args
)))
556 (cons *current-actor
* args
))))
557 (acl-compat-mp:process-unlock
(obj-lock *current-actor
*) *caller
*)))))
561 (defmethod lock ((obj protected-obj
))
562 (acl-compat-mp:process-lock
(obj-lock obj
) *current-actor
*)
563 (setf (locker obj
) *current-actor
*))
566 (defmethod unlock ((obj protected-obj
))
567 (setf (locker obj
) :none
)
568 (acl-compat-mp:process-unlock
(obj-lock obj
) *current-actor
*))
571 #| here are some examples to test active objects.
575 (defclass buffer
(ncl-object)
576 ((item :accessor buffer-item
:initform
()
577 :moving-behavior
:follow
)))
579 (defpargeneric put-item
:past
(buffer item
))
581 (defpargeneric get-item
:future
(buffer))
585 (defmethod put-item ((buf buffer
) item
)
586 (when (buffer-item buf
)
587 (wait-for (get-item)))
588 (print (setf (buffer-item buf
) item
)))
590 (defmethod get-item ((buf buffer
))
591 (unless (buffer-item buf
)
592 (wait-for (put-item ?
)))
593 (prog1 (print (buffer-item buf
))
594 (setf (buffer-item buf
) ())))
597 an example for protected objects. Be x
= y then change doubles both.
598 but the unlocked change used by concurrent actors may lead to a situation like
:
600 after change by both actors
: x
= 3 , y
= 4
603 (defclass changer
(active-object)
606 (defclass protected-var
(protected-obj)
607 ((x :accessor get-x
:initarg
:x
)
608 (y :accessor get-y
:initarg
:y
)))
610 (defpargeneric get-x
:now
(var))
611 (defpargeneric get-y
:now
(var))
612 (defpargeneric (setf get-x
) :now
(new var
))
613 (defpargeneric (setf get-y
) :now
(new var
))
615 (defpargeneric change
:past
(changer var
))
616 (defpargeneric locked-change
:past
(changer var
))
618 (defmethod change ((changer changer
) var
)
619 (let ((x (get-x var
))
625 (setf (get-x var
) (+ x y
))
627 (setf (get-y var
) (+ x Y
))))
629 (defmethod locked-change ((changer changer
) var
)
631 (let ((x (get-x var
))
637 (setf (get-x var
) (+ x y
))
639 (setf (get-y var
) (+ x y
)))
644 (setf c1
(make-instance 'changer
))
645 (setf c2
(make-instance 'changer
))
646 (setf v
(make-instance 'protected-var
:x
1 :y
1))
653 An example for a multicast
654 (defclass multi-tester
(active-object) ((sleep-time :initarg
:sleep-time
:accessor sleep-time
)))
656 (defpargeneric get-st
:future
(mt))
658 (defmethod get-st ((mt multi-tester
)) (sleep (sleep-time mt
)) (sleep-time mt
))
660 (defun m-test (nr) (maptouch #'print
(multicast (loop for i from nr downto
1
661 collect
(make-instance 'multi-tester
668 ;;;------------------------------------------------------