Fix for 0MQ 2.0.7.
[julia.git] / utils.lisp
blob5f9d72d0da6cefaf0a2b867fa1fd594f49b5124e
1 (in-package :julia)
3 (defvar *bus* "epgm://lo;226.0.0.1:5555")
5 (defvar *ctx* nil
6 "0MQ context")
7 (defvar *bus-in* nil
8 "Input bus")
9 (defvar *bus-out* nil
10 "Output bus")
11 (defvar *poll-in* nil
12 "Pollitem")
13 (defvar *me* (sb-unix::posix-getenv "NODE")
14 "Our identifier")
15 (defvar *uid* 0
16 "Sequental send-msg counter")
17 (defvar *ids* nil
18 "Nodes present at bus")
19 (defvar *gen* 0
20 "Generation ID")
21 (defvar *last-gen* 0
22 "Last saved generation ID")
23 (defvar *feeds* nil
24 "Data propagation database")
25 (defvar *cache* nil
26 "Cache for replies")
27 (defvar *not-synced* t
28 "Ask neighbours for missed updates.")
29 (defvar *save-core* nil
30 "Flag: save core when possible.")
31 (defvar *save-pid* 0)
32 (defvar *timer* nil)
33 (defvar *coordinator* nil
34 "`t' if coordinator.")
36 (defun logger (&rest args)
37 (declare (ignorable args))
38 (format t "~a ~a> " (local-time:now) *me*)
39 (apply #'format t args))
41 (defun init-messaging ()
42 (setq *ctx* (zmq:init 1)
43 *bus-in* (zmq:socket *ctx* zmq:sub)
44 *bus-out* (zmq:socket *ctx* zmq:pub)
45 *poll-in* (list (make-instance 'zmq:pollitem
46 :socket *bus-in* :events zmq:pollin)))
48 ;; Subscribe to common bus
49 (zmq:setsockopt *bus-in* zmq:subscribe "ALL")
50 (zmq:connect *bus-in* *bus*)
52 ;; Subscribe to our private bus
53 (zmq:setsockopt *bus-in* zmq:subscribe *me*)
54 (zmq:connect *bus-out* *bus*))
56 (defun shutdown-messaging ()
57 (zmq:close *bus-in*)
58 (zmq:close *bus-out*)
59 (zmq:term *ctx*))
61 (defun send-msg (to cmd &optional args)
62 (let ((data (format nil "~a ~a ~a ~a ~a" to *me* (incf *uid*) cmd (or args ""))))
63 (logger "[send-msg] ~s~%" data)
64 (zmq:send *bus-out* (make-instance 'zmq:msg :data data))))
66 (defun recv-msg (&optional (timeout -1))
67 (let ((ret (zmq:poll *poll-in* timeout))
68 (msg (make-instance 'zmq:msg))
69 rep)
70 (when (not (null ret))
71 (zmq:recv *bus-in* msg)
72 (setq rep (zmq:msg-data-as-string msg))
73 (logger "[recv-msg] ~s~%" rep))
74 rep))
76 (defun hdr (msg)
77 "Splits string into parts."
78 (let (from to uid cmd args p1 p2)
79 (handler-case
80 (setq p1 (search " " msg)
81 to (subseq msg 0 p1)
82 p1 (1+ p1)
83 p2 (search " " msg :start2 p1)
84 from (subseq msg p1 p2)
85 p2 (1+ p2)
86 p1 (search " " msg :start2 p2)
87 uid (subseq msg p2 p1)
88 p1 (1+ p1)
89 p2 (search " " msg :start2 p1)
90 cmd (subseq msg p1 p2)
91 args (or (and p1 (subseq msg (1+ p2))) ""))
92 (error ()))
93 (values to from uid cmd args)))
95 (defun rep (expr)
96 (eval (read-from-string expr)))
98 (defun safe-eval (expr)
99 (logger "safe eval: ~a~%" expr)
100 (let ((pid (sb-posix:fork)))
101 (cond
102 ((plusp pid)
103 (multiple-value-bind (pid status)
104 (sb-posix:waitpid pid 0)
105 (declare (ignore pid))
106 (if (zerop (ash status -8))
107 (progn (rep expr) t)
108 (progn (logger "safe eval failed~%") nil))))
109 ((zerop pid)
110 (handler-case
111 (rep expr)
112 (error ()
113 (sb-ext:quit :unix-status 1)))
114 (sb-ext:quit :unix-status 0))
116 (progn (error "fork failed~%") nil)))))
118 (defun coordinate ()
119 (cond
120 ((and (not *coordinator*)
121 (string= (car *ids*) *me*))
122 ;; FIXME race condition
123 ;; If there's real coordinator, but we don't know about it yet and
124 ;; receive new COORDINATOR messages.
125 (unwind-protect
126 (zmq:setsockopt *bus-in* zmq:subscribe "COORDINATOR"))
127 (setq *coordinator* t)
128 (logger "I'm the coordinator~%"))
129 ((and *coordinator*
130 (string/= (car *ids*) *me*))
131 (unwind-protect
132 (zmq:setsockopt *bus-in* zmq:unsubscribe "COORDINATOR"))
133 ;; FIXME race condition
134 ;; Contrary problem
135 (setq *coordinator* nil)
136 (logger "New coordinator is ~a~%" (car *ids*)))))