1 ;;;-----------------------------------------------------------------------------------
2 ;;; name : message-handler
4 ;;; notes : why do I use explicit messages instead of just sending list to be evaluated
5 ;;; in the remote objectspace? Well,
6 ;;; - this allows for special handling of requests
7 ;;; - message-specific packing is possible
8 ;;; - its easier to change the low-level communication mechanism, e.g. to pvm
9 ;;; - packing is done in the objspace-process, so the overhead for the calling
10 ;;; process when passing a message to a proxy is not to big.
11 ;;; contact : me (Michael Trowe)
15 ;;;-----------------------------------------------------------------------------------
20 (defvar *calling-os
* ())
22 (defclass objspace
(delayed-reply-object)
23 ((host :initarg
:host
:accessor host
)
24 (port :initarg
:port
:accessor port
)
25 (stream :initarg
:objstream
:accessor stream
))) ; FIXME was stream enstead of objstream
27 (defpargeneric send-request
:future
(objspace address caller method args
))
29 (defpargeneric send-operation
:past
(objspace address caller method args
))
31 (defpargeneric kernel-send
:future
(objspace message
))
33 (defpargeneric remote-eval
:now
(objspace form
))
35 (defclass receiver
(autonomous-object)
36 ((objspace :accessor objspace
:initarg
:objspace
)
37 (stream :initarg
:stream
:accessor stream
)))
39 (defmethod initialize-instance :after
((inst receiver
) &rest initargs
)
40 (declare (ignore initargs
))
42 #| wird in autonomousobject init-instan. erledigt
43 (setf (process inst
) (mp:make-process
:name
(string (class-name (class-of inst
)))
46 (mp:process-preset
(process inst
) #'active-loop inst
)
47 (mp:process-enable
(process inst
)))
51 (defmethod active-loop ((receiver receiver
))
52 (handler-case (loop while t
53 do
(active-body receiver
))
54 (end-of-file () (connection-lost *manager
* receiver
))))
56 (defmethod active-body ((receiver receiver
))
57 (let ((*calling-os
* (objspace receiver
))
58 (*package
* (find-package "NETCLOS")))
59 (acl-compat-mp:wait-for-input-available
(stream receiver
))
60 ;;;(print (stream receiver) excl::*initial-terminal-io*)
61 ;;The process stops until input through the stream is available.
63 (when (or (equal *local-host
* "ki18"))
66 excl
::stream-read-char
67 excl
::stream-write-char
68 excl
::stream-read-byte excl
::stream-write-byte read
))
71 (read (stream receiver
))))
72 (when (or (equal *local-host
* "ki18")
73 (equal *local-host
* "ki3"))
74 (format t
"B Rec: ~S B: Tran: ~S" (excl::bytes-received
(stream receiver
))
75 (excl::bytes-transmitted
(stream receiver
))))
76 (if (evaluable read-thing
)
82 (eval (read (stream receiver
)))
83 )) ;Read a message and evaluate it.
85 (defun evaluable (thing)
86 (cond ((symbolp thing
)
90 (defmethod schedule ((sched standard-scheduler
) (obj objspace
) message
)
91 (enqueue (mail-queue obj
) message
)
92 (unless (active-p obj
)
93 (setf (active-p obj
) t
)
94 (acl-compat-mp:process-run-function
(list :name
(symbol-name (class-name (class-of obj
)))
99 (defmethod kill :after
((obj objspace
))
100 (close (stream obj
)))
102 (defmethod reset ((obj objspace
))
103 (clear-output (stream obj
)))
105 (defmethod kill :after
((obj receiver
))
106 (close (stream obj
)))
108 (defmethod reset ((rec receiver
))
109 (clear-input (stream rec
)))
111 (defmethod send-request ((space objspace
) address caller method args
)
113 (request-call-message :obj-address address
118 (defmethod send-operation ((space objspace
) address caller method args
)
120 (operation-call-message :obj-address address
125 (defmethod remote-eval ((space objspace
) form
)
126 (kernel-send space
(remote-evaluation-message :form form
)))
128 (defmethod kernel-send :before
((space objspace
) (message request
))
129 (setf (request-message-id message
) (store-future space
)))
132 (defmethod kernel-send ((space objspace
) message
)
134 (let ((*package
* (find-package "NETCLOS")))
135 #|
(when (and (equal (host space
) "ki18")
136 (equal (rmi-class-name message
) 'nc
::function-node
))
137 (trace excl
::stream-read-byte excl
::stream-write-byte
)
138 (print (list "TOki18" message
))
139 (pack message excl
::*initial-terminal-io
*))
142 (pack message
(stream space
))
144 (finish-output (stream space
))
148 (defmethod move ((obj mobile-object
)
150 (let* ((address (touch (kernel-send destination-os
151 (moving-message :moved-obj obj
))))
152 (obj-class-name (class-name (class-of obj
)))
153 (proxy (change-class obj
(ensure-proxy-class obj-class-name
))))
154 ;;; initialisierung analog zu initialize-instance in distribute.lisp
156 (setf (masterclass-name proxy
) obj-class-name
)
157 (setf (remote-os proxy
) destination-os
)
158 (setf (remote-address proxy
) address
)
159 (setf (other-pack-string proxy
)
160 (format nil
"(np ~a '~a (fos \"~a\"))"
161 (remote-address proxy
)
162 (format nil
"~S" (masterclass-name proxy
))
163 (host (remote-os proxy
))))
164 (setf (master-pack-string proxy
) (format nil
"(fetch-obj ~a)"
165 (remote-address proxy
)))
166 (notify-proxy proxy
)))
168 (defmethod receive-reply ((space objspace
) message-id content
)
169 (write-to-future (get-future space message-id
) content
))
171 (defmethod receive-call ((method symbol
) *current-actor
* obj args
)
172 (apply (fdefinition method
) obj args
))
174 (defmethod receive-call ((method cons
) *current-actor
* obj args
)
175 (apply (fdefinition method
) (first args
) obj
(rest args
)))
179 (defmessage request-call
(obj-address caller method arguments
)
182 :receive-action
(touch (receive-call method
184 (fetch-obj obj-address
)
187 (defmessage operation-call
(obj-address caller method arguments
)
189 :receive-action
(receive-call method
191 (fetch-obj obj-address
)
194 (defmessage reply
(message-id content
)
195 :receive-action
(receive-reply *calling-os
* message-id content
))
197 (defmessage moving
((moved-obj :packing move-pack
))
199 :receive-action
(ensure-exported moved-obj
))
201 (defmessage rmi
(class-name initargs-b
) ;remote-make-instance
204 :receive-action
(apply #'make-instance class-name initargs-b
))
206 (defmessage remote-evaluation
(form)
209 :receive-action
(eval form
))
212 (defmethod pack ((obj t
) &optional stream
)
216 (defmethod pack ((obj symbol
) &optional stream
)
219 (print (list 'intern
(list 'quote
(symbol-name obj
)) (intern
220 (package-name (symbol-package obj
)) :keyword
))
224 (defmethod pack ((obj null
) &optional stream
)
225 (write-string "()" stream
)
229 (defmethod pack ((obj cons
) &optional stream
)
230 (write-string "(cons " stream
)
231 (pack (car obj
) stream
)
232 (write-char #\space stream
)
233 (pack (cdr obj
) stream
)
234 (write-char #\
) stream
)
237 (defmethod pack ((obj string
) &optional stream
)
241 (defmethod pack ((array array
) &optional stream
)
242 (write-string "(unpack-array " stream
)
243 (pack (array-dimensions array
) stream
)
244 (write-char #\space stream
)
245 (pack (array-element-type array
) stream
)
246 (write-char #\space stream
)
247 (loop for i from
0 upto
(1- (array-total-size array
))
248 do
(progn (pack (row-major-aref array i
) stream
)
249 (write-char #\space stream
)))
250 (write-char #\
) stream
)
253 (defun unpack-array (dims type
&rest elements
)
254 (let ((ar (make-array dims
:element-type type
)))
255 (loop for i from
0 upto
(1- (array-total-size ar
))
257 do
(progn (setf (row-major-aref ar i
) el
)))
260 (defmethod pack ((func function
) &optional stream
)
261 (let ((denom (get-denom func
)))
263 (progn (write-string "(function " stream
)
264 (write-string (symbol-name denom
) stream
)
265 (write-char #\
) stream
))
266 (progn (write-string "(fnp " stream
)
267 (write (ensure-exported func
) :stream stream
)
268 (write (class-name (class-of func
)) :stream stream
)
269 (write-char #\
) stream
))))
272 (defun fnp (address class-name
)
273 (let ((p (notify-proxy (make-proxy class-name
276 #'(lambda (&rest rest
)
277 (send-now p
'apply rest
))))
279 ;;; ohne *full-packing* wirds dem Benutzer ueberlassen, wie
280 ;;; eine klasse verpackt wird.
282 (defvar *full-packing
* nil
)
283 (defmethod pack :around
((obj standard-object
) &optional stream
)
285 (pack-standard-object obj stream
)
288 (defun pack-standard-object (obj stream
)
289 (let ((reference (gentemp "I")))
290 (setq *pack-forms
* (acons obj reference
292 (format stream
"(progn (proclaim '(special ~a))
293 (prog1 (setq ~a (make-instance '~S))
294 (unpack-slots ~a (list"
295 reference reference
(class-name (class-of obj
)) reference
)
296 (pack-slots-default (class-of obj
) obj stream
)
297 (write-string "))))" stream
))
301 (defun pack-slots-default (class obj stream
)
302 (loop for slot in
(class-slots class
)
304 (move-pack (slot-value obj
(slot-definition-name slot
)) stream
)))
307 (defmethod pack ((obj standard-object
) &optional stream
)
308 (multiple-value-bind (address obj-string
)
309 (ensure-exported obj
)
310 (declare (ignore address
))
311 (write-string obj-string stream
))
314 (defun np (address class-name
&optional
(os *calling-os
*))
315 (notify-proxy (make-proxy class-name
320 (defmethod pack ((proxy proxy
) &optional stream
)
321 (if (eq (remote-os proxy
) *current-actor
*)
322 (write-string (master-pack-string proxy
) stream
)
323 (progn (notify-sending proxy
)
324 (write-string (other-pack-string proxy
) stream
)))
327 (defmethod pack ((space objspace
) &optional stream
)
328 (write-string (concatenate 'string
"(fos \"" (host space
) "\")") stream
)
332 (find host
(spaces *manager
*) :key
#'host
:test
#'equal
))
334 (defmethod print-object ((n objspace
) stream
)
335 (format stream
"#<~S on host: ~S>" (type-of n
) (slot-value n
'host
)))
340 (defmethod pack ((struc structure-object
) &optional stream
)
341 (let* ((class-name (class-name (class-of struc
)))
342 (class-package (symbol-package class-name
)))
343 (format stream
"(~A::MAKE-" (package-name class-package
))
344 (write-string (string class-name
) stream
)
345 (write-char #\space stream
)
346 (loop for slot in
(class-slots (class-of struc
))
347 for name
= (slot-definition-name slot
)
348 do
(progn (write-char #\
: stream
)
349 (write-string (string name
) stream
)
350 (write-char #\space stream
)
351 (pack (funcall (fdefinition (intern (format nil
"~a-~a"
357 (write-char #\
) stream
)
360 (defun objectspace-active-p (objspace)
361 (slot-boundp objspace
'stream
))