1 package org
.hnaves
.dfs
.election
;
3 import java
.io
.IOException
;
4 import java
.util
.ArrayList
;
5 import java
.util
.HashSet
;
7 import java
.util
.logging
.Level
;
8 import java
.util
.logging
.Logger
;
10 import org
.hnaves
.dfs
.Configurator
;
11 import org
.hnaves
.dfs
.election
.messages
.Accept
;
12 import org
.hnaves
.dfs
.election
.messages
.AcceptReply
;
13 import org
.hnaves
.dfs
.election
.messages
.AreYouCoordinator
;
14 import org
.hnaves
.dfs
.election
.messages
.AreYouCoordinatorReply
;
15 import org
.hnaves
.dfs
.election
.messages
.AreYouThere
;
16 import org
.hnaves
.dfs
.election
.messages
.AreYouThereReply
;
17 import org
.hnaves
.dfs
.election
.messages
.Invitation
;
18 import org
.hnaves
.dfs
.election
.messages
.Message
;
19 import org
.hnaves
.dfs
.election
.messages
.Ready
;
20 import org
.hnaves
.dfs
.election
.messages
.ReadyReply
;
21 import org
.hnaves
.dfs
.election
.socket
.Socket
;
24 public class Election
{
25 private static final Logger LOG
= Logger
.getLogger(Election
.class.getName());
27 private final Socket socket
;
28 private final Node id
;
31 private Node groupCoordinator
;
32 private Status status
;
34 private boolean isCoordinatorAlive
;
35 private long lastDiscriminator
;
36 private long lastElectionTime
;
37 private long lastReorganizationTime
;
39 private int numReadyAnswers
;
41 private final Set
<Node
> upNodes
= new HashSet
<Node
>();
42 private final Set
<Node
> otherCoordinators
= new HashSet
<Node
>();
44 private final ArrayList
<ElectionListener
> listeners
= new ArrayList
<ElectionListener
>();
46 private class ReceiverThread
extends Thread
{
47 public ReceiverThread() { super("Receiver Thread"); }
50 setPriority(MIN_PRIORITY
);
57 synchronized(Election
.this) {
58 if (status
== Status
.DOWN
) break;
61 Message message
= socket
.receiveMessage();
62 if (message
!= null && !message
.getFrom().equals(id
)) {
63 LOG
.fine("Processing message " + message
+ "\n");
64 processMessage(message
);
66 } catch(Throwable t
) {
67 LOG
.log(Level
.SEVERE
, "Exception while receiving message", t
);
72 private final Thread receiverThread
= new ReceiverThread();
75 private class MainThread
extends Thread
{
76 private MainThread() { super("Main Thread"); setPriority(Thread
.MIN_PRIORITY
); }
79 setPriority(MIN_PRIORITY
);
85 synchronized(Election
.this) {
86 if (status
== Status
.DOWN
) break;
93 } catch(Throwable t
) {
94 LOG
.log(Level
.SEVERE
, "Exception while running", t
);
99 private final Thread mainThread
= new MainThread();
101 public Election(Node id
, Socket socket
) {
102 if (id
== null) throw new NullPointerException("Id is null");
103 if (socket
== null) throw new NullPointerException("Socket is null");
105 this.socket
= socket
;
108 public Node
getId() {
112 public synchronized Node
getGroupCoordinatorIfNormal() {
113 if (status
== Status
.NORMAL
) {
114 return groupCoordinator
;
119 public synchronized Status
getStatus() {
123 public synchronized long getGroup() {
127 public synchronized Node
getGroupCoordinator() {
128 return groupCoordinator
;
131 public synchronized void start() {
132 if (status
== null) {
133 LOG
.fine("Starting Node " + id
+ "...\n");
134 setStatus(Status
.RECOVERY
);
135 receiverThread
.start();
140 public void stop() throws InterruptedException
{
142 LOG
.fine("Stopping Node " + id
+ " ...\n");
143 setStatus(Status
.DOWN
);
146 receiverThread
.join();
149 public synchronized void registerListener(ElectionListener listener
) {
150 if (listener
== null) throw new NullPointerException("Listener is null");
151 listeners
.add(listener
);
154 public synchronized void unregisterListener(ElectionListener listener
) {
155 if (listener
== null) throw new NullPointerException("Listener is null");
156 listeners
.remove(listener
);
160 private synchronized void processMessage(Message message
) throws IOException
{
161 if (message
instanceof AreYouCoordinator
) {
162 processAreYouCoordinator((AreYouCoordinator
) message
);
163 } else if (message
instanceof AreYouCoordinatorReply
) {
164 processAreYouCoordinatorReply((AreYouCoordinatorReply
) message
);
165 } else if (message
instanceof AreYouThere
) {
166 processAreYouThere((AreYouThere
) message
);
167 } else if (message
instanceof AreYouThereReply
) {
168 processAreYouThereReply((AreYouThereReply
) message
);
169 } else if (message
instanceof Invitation
) {
170 processInvitation((Invitation
) message
);
171 } else if (message
instanceof Accept
) {
172 processAccept((Accept
) message
);
173 } else if (message
instanceof AcceptReply
) {
174 processAcceptReply((AcceptReply
) message
);
175 } else if (message
instanceof Ready
) {
176 processReady((Ready
) message
);
177 } else if (message
instanceof ReadyReply
) {
178 processReadyReply((ReadyReply
) message
);
182 private void processAreYouCoordinator(AreYouCoordinator message
) throws IOException
{
183 AreYouCoordinatorReply reply
;
184 if (status
== Status
.NORMAL
&& isGroupCoordinator()) {
185 reply
= new AreYouCoordinatorReply(id
, message
.getFrom(), message
.getDiscriminator(), true);
187 reply
= new AreYouCoordinatorReply(id
, message
.getFrom(), message
.getDiscriminator(), false);
192 private void processAreYouCoordinatorReply(AreYouCoordinatorReply reply
) throws IOException
{
193 if (reply
.getReply() && reply
.getDiscriminator() == lastDiscriminator
) // To avoid unnecessary merges
194 otherCoordinators
.add(reply
.getFrom());
197 private void processAreYouThere(AreYouThere message
) throws IOException
{
198 AreYouThereReply reply
;
199 if (group
== message
.getGroup() && isGroupCoordinator()) { // Do not check status, because non-coordinator processors become normal before the coordinator
200 reply
= new AreYouThereReply(id
, message
.getFrom(), true);
202 reply
= new AreYouThereReply(id
, message
.getFrom(), false);
207 private void processAreYouThereReply(AreYouThereReply reply
) throws IOException
{
208 if (!isGroupCoordinator())
209 isCoordinatorAlive
= reply
.getReply();
212 private void processInvitation(Invitation message
) throws IOException
{
213 if (status
== Status
.NORMAL
) {
214 setStatus(Status
.ELECTION
);
215 lastElectionTime
= getTime();
217 if (isGroupCoordinator()) {
218 for(Node nodeId
: upNodes
) {
219 Invitation invitation
= new Invitation(id
, nodeId
, message
.getGroupCoordinator(), message
.getGroup());
220 sendMessage(invitation
);
223 setGroupAndCoordinator(message
.getGroup(), message
.getGroupCoordinator());
224 Accept accept
= new Accept(id
, groupCoordinator
, group
);
229 private void processReady(Ready message
) throws IOException
{
231 if (status
== Status
.REORGANIZATION
&& group
== message
.getGroup()) {
232 setStatus(Status
.NORMAL
);
233 reply
= new ReadyReply(id
, message
.getFrom(), true, group
);
235 reply
= new ReadyReply(id
, message
.getFrom(), false, group
);
240 private void processReadyReply(ReadyReply reply
) throws IOException
{
241 if (reply
.getReply() && reply
.getGroup() == group
) {
246 private void processAccept(Accept message
) throws IOException
{
248 if (status
== Status
.ELECTION
&& isGroupCoordinator() && message
.getGroup() == group
) {
249 upNodes
.add(message
.getFrom());
250 reply
= new AcceptReply(id
, message
.getFrom(), true);
252 reply
= new AcceptReply(id
, message
.getFrom(), false);
257 private void processAcceptReply(AcceptReply reply
) throws IOException
{
258 if (status
== Status
.ELECTION
&& groupCoordinator
.equals(reply
.getFrom())) {
259 if (reply
.getReply()) {
260 setStatus(Status
.REORGANIZATION
);
261 lastReorganizationTime
= getTime();
266 private synchronized void recovery() {
267 if (status
== Status
.ELECTION
&& checkElectionTimeout())
268 setStatus(Status
.RECOVERY
);
269 if (status
== Status
.REORGANIZATION
&& checkReorganizationTimeout())
270 setStatus(Status
.RECOVERY
);
272 if (status
!= Status
.RECOVERY
&& status
!= Status
.DOWN
) return;
274 setStatus(Status
.ELECTION
);
275 setGroupAndCoordinator(generateGroup(), id
);
277 setStatus(Status
.REORGANIZATION
);
278 setStatus(Status
.NORMAL
);
281 private synchronized void check() throws IOException
, InterruptedException
{
282 if (status
!= Status
.NORMAL
|| !isGroupCoordinator()) return;
284 otherCoordinators
.clear();
285 lastDiscriminator
= generateDiscriminator();
286 AreYouCoordinator message
= new AreYouCoordinator(id
, Node
.BROADCAST_NODE
, lastDiscriminator
);
287 sendMessage(message
);
288 wait(Configurator
.getAreYouCoordinatorTimeout());
289 wait((int) (Math
.random() * Configurator
.getAreYouCoordinatorRandomFactor()));
291 if (!otherCoordinators
.isEmpty() && isGroupCoordinator()) {
292 LOG
.fine("Other coordinator of " + id
+ " " + otherCoordinators
+ "\n");
293 setStatus(Status
.MERGE
);
297 private synchronized void merge() throws IOException
, InterruptedException
{
298 if (status
!= Status
.MERGE
) return;
300 setStatus(Status
.ELECTION
);
301 setGroupAndCoordinator(generateGroup(), id
);
302 Set
<Node
> nodes
= new HashSet
<Node
>(upNodes
);
304 nodes
.addAll(otherCoordinators
);
305 for(Node nodeId
: nodes
) {
306 Invitation invitationMessage
= new Invitation(id
, nodeId
, groupCoordinator
, group
);
307 sendMessage(invitationMessage
);
309 wait(Configurator
.getInvitationTimeout());
311 setStatus(Status
.REORGANIZATION
);
314 for(Node nodeId
: upNodes
) {
315 Ready readyMessage
= new Ready(id
, nodeId
, group
);
316 sendMessage(readyMessage
);
319 wait(Configurator
.getReadyTimeout());
320 if (numReadyAnswers
< upNodes
.size()) {
321 LOG
.fine("Missing " + (upNodes
.size() - numReadyAnswers
) + " ready answers in " + id
+ "\n");
322 setStatus(Status
.RECOVERY
);
324 setStatus(Status
.NORMAL
);
328 private synchronized void timeout() throws IOException
, InterruptedException
{
329 if (status
!= Status
.NORMAL
|| id
.equals(groupCoordinator
)) return;
331 AreYouThere message
= new AreYouThere(id
, groupCoordinator
, group
);
332 isCoordinatorAlive
= false;
333 sendMessage(message
);
334 wait(Configurator
.getAreYouThereTimeout());
336 if (!isCoordinatorAlive
) {
337 LOG
.fine("Coordinator is not alive in node " + id
+ "!!!\n");
338 setStatus(Status
.RECOVERY
);
342 private boolean checkElectionTimeout() {
343 return (lastElectionTime
+ Configurator
.getElectionTimeout() < getTime());
346 private boolean checkReorganizationTimeout() {
347 return (lastReorganizationTime
+ Configurator
.getReorganizationTimeout() < getTime());
350 private long getTime() {
351 return System
.currentTimeMillis();
354 private long generateDiscriminator() {
358 private long generateGroup() {
359 return generateDiscriminator();
362 private void setGroupAndCoordinator(long group
, Node groupCoordinator
) {
363 LOG
.fine("Changing to group " + group
+ " coordinator " + groupCoordinator
+ " in node " + id
+ "\n");
365 this.groupCoordinator
= groupCoordinator
;
368 private boolean isGroupCoordinator() {
369 return id
.equals(groupCoordinator
);
372 private void setStatus(Status status
) {
373 LOG
.fine("Entering " + status
+ " status in node " + id
+ "\n");
374 this.status
= status
;
375 for(ElectionListener listener
: listeners
) {
376 listener
.onStateChange(this);
381 private void sendMessage(Message message
) throws IOException
{
382 LOG
.fine("Sending message " + message
+ "\n");
383 socket
.sendMessage(message
);