3 (defvar *bus
* "epgm://lo;226.0.0.1:5555")
13 (defvar *me
* (sb-unix::posix-getenv
"NODE")
16 "Sequental send-msg counter")
18 "Nodes present at bus")
22 "Last saved generation ID")
24 "Data propagation database")
27 (defvar *not-synced
* t
28 "Ask neighbours for missed updates.")
29 (defvar *save-core
* nil
30 "Flag: save core when possible.")
33 (defvar *coordinator
* nil
34 "`t' if coordinator.")
36 (defun logger (&rest args
)
37 (declare (ignorable args
))
38 (format t
"~a ~a> " (local-time:now
) *me
*)
39 (apply #'format t args
))
41 (defun init-messaging ()
42 (setq *ctx
* (zmq:init
1)
43 *bus-in
* (zmq:socket
*ctx
* zmq
:sub
)
44 *bus-out
* (zmq:socket
*ctx
* zmq
:pub
)
45 *poll-in
* (list (make-instance 'zmq
:pollitem
46 :socket
*bus-in
* :events zmq
:pollin
)))
48 ;; Subscribe to common bus
49 (zmq:setsockopt
*bus-in
* zmq
:subscribe
"ALL")
50 (zmq:connect
*bus-in
* *bus
*)
52 ;; Subscribe to our private bus
53 (zmq:setsockopt
*bus-in
* zmq
:subscribe
*me
*)
54 (zmq:connect
*bus-out
* *bus
*))
56 (defun shutdown-messaging ()
61 (defun send-msg (to cmd
&optional args
)
62 (let ((data (format nil
"~a ~a ~a ~a ~a" to
*me
* (incf *uid
*) cmd
(or args
""))))
63 (logger "[send-msg] ~s~%" data
)
64 (zmq:send
*bus-out
* (make-instance 'zmq
:msg
:data data
))))
66 (defun recv-msg (&optional
(timeout -
1))
67 (let ((ret (zmq:poll
*poll-in
* timeout
))
68 (msg (make-instance 'zmq
:msg
))
70 (when (not (null ret
))
71 (zmq:recv
*bus-in
* msg
)
72 (setq rep
(zmq:msg-data-as-string msg
))
73 (logger "[recv-msg] ~s~%" rep
))
77 "Splits string into parts."
78 (let (from to uid cmd args p1 p2
)
80 (setq p1
(search " " msg
)
83 p2
(search " " msg
:start2 p1
)
84 from
(subseq msg p1 p2
)
86 p1
(search " " msg
:start2 p2
)
87 uid
(subseq msg p2 p1
)
89 p2
(search " " msg
:start2 p1
)
90 cmd
(subseq msg p1 p2
)
91 args
(or (and p1
(subseq msg
(1+ p2
))) ""))
93 (values to from uid cmd args
)))
96 (eval (read-from-string expr
)))
98 (defun safe-eval (expr)
99 (logger "safe eval: ~a~%" expr
)
100 (let ((pid (sb-posix:fork
)))
103 (multiple-value-bind (pid status
)
104 (sb-posix:waitpid pid
0)
105 (declare (ignore pid
))
106 (if (zerop (ash status -
8))
108 (progn (logger "safe eval failed~%") nil
))))
113 (sb-ext:quit
:unix-status
1)))
114 (sb-ext:quit
:unix-status
0))
116 (progn (error "fork failed~%") nil
)))))
120 ((and (not *coordinator
*)
121 (string= (car *ids
*) *me
*))
122 ;; FIXME race condition
123 ;; If there's real coordinator, but we don't know about it yet and
124 ;; receive new COORDINATOR messages.
126 (zmq:setsockopt
*bus-in
* zmq
:subscribe
"COORDINATOR"))
127 (setq *coordinator
* t
)
128 (logger "I'm the coordinator~%"))
130 (string/= (car *ids
*) *me
*))
132 (zmq:setsockopt
*bus-in
* zmq
:unsubscribe
"COORDINATOR"))
133 ;; FIXME race condition
135 (setq *coordinator
* nil
)
136 (logger "New coordinator is ~a~%" (car *ids
*)))))