Novo fonte
[DistributedFileSystem.git] / src / main / java / org / hnaves / dfs / election / Election.java
blobc0b28a57536692da239c4bd31ec3ff525b8b3ca5
1 package org.hnaves.dfs.election;
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.HashSet;
6 import java.util.Set;
7 import java.util.logging.Level;
8 import java.util.logging.Logger;
10 import org.hnaves.dfs.Configurator;
11 import org.hnaves.dfs.election.messages.Accept;
12 import org.hnaves.dfs.election.messages.AcceptReply;
13 import org.hnaves.dfs.election.messages.AreYouCoordinator;
14 import org.hnaves.dfs.election.messages.AreYouCoordinatorReply;
15 import org.hnaves.dfs.election.messages.AreYouThere;
16 import org.hnaves.dfs.election.messages.AreYouThereReply;
17 import org.hnaves.dfs.election.messages.Invitation;
18 import org.hnaves.dfs.election.messages.Message;
19 import org.hnaves.dfs.election.messages.Ready;
20 import org.hnaves.dfs.election.messages.ReadyReply;
21 import org.hnaves.dfs.election.socket.Socket;
24 public class Election {
25 private static final Logger LOG = Logger.getLogger(Election.class.getName());
27 private final Socket socket;
28 private final Node id;
30 private long group;
31 private Node groupCoordinator;
32 private Status status;
34 private boolean isCoordinatorAlive;
35 private long lastDiscriminator;
36 private long lastElectionTime;
37 private long lastReorganizationTime;
39 private int numReadyAnswers;
41 private final Set<Node> upNodes = new HashSet<Node>();
42 private final Set<Node> otherCoordinators = new HashSet<Node>();
44 private final ArrayList<ElectionListener> listeners = new ArrayList<ElectionListener>();
46 private class ReceiverThread extends Thread {
47 public ReceiverThread() { super("Receiver Thread"); }
48 @Override
49 public void start() {
50 setPriority(MIN_PRIORITY);
51 super.start();
54 @Override
55 public void run() {
56 while(true) {
57 synchronized(Election.this) {
58 if (status == Status.DOWN) break;
60 try {
61 Message message = socket.receiveMessage();
62 if (message != null && !message.getFrom().equals(id)) {
63 LOG.fine("Processing message " + message + "\n");
64 processMessage(message);
66 } catch(Throwable t) {
67 LOG.log(Level.SEVERE, "Exception while receiving message", t);
72 private final Thread receiverThread = new ReceiverThread();
75 private class MainThread extends Thread {
76 private MainThread() { super("Main Thread"); setPriority(Thread.MIN_PRIORITY); }
77 @Override
78 public void start() {
79 setPriority(MIN_PRIORITY);
80 super.start();
82 @Override
83 public void run() {
84 while(true) {
85 synchronized(Election.this) {
86 if (status == Status.DOWN) break;
88 try {
89 check();
90 merge();
91 recovery();
92 timeout();
93 } catch(Throwable t) {
94 LOG.log(Level.SEVERE, "Exception while running", t);
99 private final Thread mainThread = new MainThread();
101 public Election(Node id, Socket socket) {
102 if (id == null) throw new NullPointerException("Id is null");
103 if (socket == null) throw new NullPointerException("Socket is null");
104 this.id = id;
105 this.socket = socket;
108 public Node getId() {
109 return id;
112 public synchronized Node getGroupCoordinatorIfNormal() {
113 if (status == Status.NORMAL) {
114 return groupCoordinator;
116 return null;
119 public synchronized Status getStatus() {
120 return status;
123 public synchronized long getGroup() {
124 return group;
127 public synchronized Node getGroupCoordinator() {
128 return groupCoordinator;
131 public synchronized void start() {
132 if (status == null) {
133 LOG.fine("Starting Node " + id + "...\n");
134 setStatus(Status.RECOVERY);
135 receiverThread.start();
136 mainThread.start();
140 public void stop() throws InterruptedException {
141 synchronized(this) {
142 LOG.fine("Stopping Node " + id + " ...\n");
143 setStatus(Status.DOWN);
145 mainThread.join();
146 receiverThread.join();
149 public synchronized void registerListener(ElectionListener listener) {
150 if (listener == null) throw new NullPointerException("Listener is null");
151 listeners.add(listener);
154 public synchronized void unregisterListener(ElectionListener listener) {
155 if (listener == null) throw new NullPointerException("Listener is null");
156 listeners.remove(listener);
160 private synchronized void processMessage(Message message) throws IOException {
161 if (message instanceof AreYouCoordinator) {
162 processAreYouCoordinator((AreYouCoordinator) message);
163 } else if (message instanceof AreYouCoordinatorReply) {
164 processAreYouCoordinatorReply((AreYouCoordinatorReply) message);
165 } else if (message instanceof AreYouThere) {
166 processAreYouThere((AreYouThere) message);
167 } else if (message instanceof AreYouThereReply) {
168 processAreYouThereReply((AreYouThereReply) message);
169 } else if (message instanceof Invitation) {
170 processInvitation((Invitation) message);
171 } else if (message instanceof Accept) {
172 processAccept((Accept) message);
173 } else if (message instanceof AcceptReply) {
174 processAcceptReply((AcceptReply) message);
175 } else if (message instanceof Ready) {
176 processReady((Ready) message);
177 } else if (message instanceof ReadyReply) {
178 processReadyReply((ReadyReply) message);
182 private void processAreYouCoordinator(AreYouCoordinator message) throws IOException {
183 AreYouCoordinatorReply reply;
184 if (status == Status.NORMAL && isGroupCoordinator()) {
185 reply = new AreYouCoordinatorReply(id, message.getFrom(), message.getDiscriminator(), true);
186 } else {
187 reply = new AreYouCoordinatorReply(id, message.getFrom(), message.getDiscriminator(), false);
189 sendMessage(reply);
192 private void processAreYouCoordinatorReply(AreYouCoordinatorReply reply) throws IOException {
193 if (reply.getReply() && reply.getDiscriminator() == lastDiscriminator) // To avoid unnecessary merges
194 otherCoordinators.add(reply.getFrom());
197 private void processAreYouThere(AreYouThere message) throws IOException {
198 AreYouThereReply reply;
199 if (group == message.getGroup() && isGroupCoordinator()) { // Do not check status, because non-coordinator processors become normal before the coordinator
200 reply = new AreYouThereReply(id, message.getFrom(), true);
201 } else {
202 reply = new AreYouThereReply(id, message.getFrom(), false);
204 sendMessage(reply);
207 private void processAreYouThereReply(AreYouThereReply reply) throws IOException {
208 if (!isGroupCoordinator())
209 isCoordinatorAlive = reply.getReply();
212 private void processInvitation(Invitation message) throws IOException {
213 if (status == Status.NORMAL) {
214 setStatus(Status.ELECTION);
215 lastElectionTime = getTime();
217 if (isGroupCoordinator()) {
218 for(Node nodeId : upNodes) {
219 Invitation invitation = new Invitation(id, nodeId, message.getGroupCoordinator(), message.getGroup());
220 sendMessage(invitation);
223 setGroupAndCoordinator(message.getGroup(), message.getGroupCoordinator());
224 Accept accept = new Accept(id, groupCoordinator, group);
225 sendMessage(accept);
229 private void processReady(Ready message) throws IOException {
230 ReadyReply reply;
231 if (status == Status.REORGANIZATION && group == message.getGroup()) {
232 setStatus(Status.NORMAL);
233 reply = new ReadyReply(id, message.getFrom(), true, group);
234 } else {
235 reply = new ReadyReply(id, message.getFrom(), false, group);
237 sendMessage(reply);
240 private void processReadyReply(ReadyReply reply) throws IOException {
241 if (reply.getReply() && reply.getGroup() == group) {
242 numReadyAnswers++;
246 private void processAccept(Accept message) throws IOException {
247 AcceptReply reply;
248 if (status == Status.ELECTION && isGroupCoordinator() && message.getGroup() == group) {
249 upNodes.add(message.getFrom());
250 reply = new AcceptReply(id, message.getFrom(), true);
251 } else {
252 reply = new AcceptReply(id, message.getFrom(), false);
254 sendMessage(reply);
257 private void processAcceptReply(AcceptReply reply) throws IOException {
258 if (status == Status.ELECTION && groupCoordinator.equals(reply.getFrom())) {
259 if (reply.getReply()) {
260 setStatus(Status.REORGANIZATION);
261 lastReorganizationTime = getTime();
266 private synchronized void recovery() {
267 if (status == Status.ELECTION && checkElectionTimeout())
268 setStatus(Status.RECOVERY);
269 if (status == Status.REORGANIZATION && checkReorganizationTimeout())
270 setStatus(Status.RECOVERY);
272 if (status != Status.RECOVERY && status != Status.DOWN) return;
274 setStatus(Status.ELECTION);
275 setGroupAndCoordinator(generateGroup(), id);
276 upNodes.clear();
277 setStatus(Status.REORGANIZATION);
278 setStatus(Status.NORMAL);
281 private synchronized void check() throws IOException, InterruptedException {
282 if (status != Status.NORMAL || !isGroupCoordinator()) return;
284 otherCoordinators.clear();
285 lastDiscriminator = generateDiscriminator();
286 AreYouCoordinator message = new AreYouCoordinator(id, Node.BROADCAST_NODE, lastDiscriminator);
287 sendMessage(message);
288 wait(Configurator.getAreYouCoordinatorTimeout());
289 wait((int) (Math.random() * Configurator.getAreYouCoordinatorRandomFactor()));
291 if (!otherCoordinators.isEmpty() && isGroupCoordinator()) {
292 LOG.fine("Other coordinator of " + id + " " + otherCoordinators + "\n");
293 setStatus(Status.MERGE);
297 private synchronized void merge() throws IOException, InterruptedException {
298 if (status != Status.MERGE) return;
300 setStatus(Status.ELECTION);
301 setGroupAndCoordinator(generateGroup(), id);
302 Set<Node> nodes = new HashSet<Node>(upNodes);
303 upNodes.clear();
304 nodes.addAll(otherCoordinators);
305 for(Node nodeId : nodes) {
306 Invitation invitationMessage = new Invitation(id, nodeId, groupCoordinator, group);
307 sendMessage(invitationMessage);
309 wait(Configurator.getInvitationTimeout());
311 setStatus(Status.REORGANIZATION);
313 numReadyAnswers = 0;
314 for(Node nodeId : upNodes) {
315 Ready readyMessage = new Ready(id, nodeId, group);
316 sendMessage(readyMessage);
319 wait(Configurator.getReadyTimeout());
320 if (numReadyAnswers < upNodes.size()) {
321 LOG.fine("Missing " + (upNodes.size() - numReadyAnswers) + " ready answers in " + id + "\n");
322 setStatus(Status.RECOVERY);
323 } else {
324 setStatus(Status.NORMAL);
328 private synchronized void timeout() throws IOException, InterruptedException {
329 if (status != Status.NORMAL || id.equals(groupCoordinator)) return;
331 AreYouThere message = new AreYouThere(id, groupCoordinator, group);
332 isCoordinatorAlive = false;
333 sendMessage(message);
334 wait(Configurator.getAreYouThereTimeout());
336 if (!isCoordinatorAlive) {
337 LOG.fine("Coordinator is not alive in node " + id + "!!!\n");
338 setStatus(Status.RECOVERY);
342 private boolean checkElectionTimeout() {
343 return (lastElectionTime + Configurator.getElectionTimeout() < getTime());
346 private boolean checkReorganizationTimeout() {
347 return (lastReorganizationTime + Configurator.getReorganizationTimeout() < getTime());
350 private long getTime() {
351 return System.currentTimeMillis();
354 private long generateDiscriminator() {
355 return getTime();
358 private long generateGroup() {
359 return generateDiscriminator();
362 private void setGroupAndCoordinator(long group, Node groupCoordinator) {
363 LOG.fine("Changing to group " + group + " coordinator " + groupCoordinator + " in node " + id + "\n");
364 this.group = group;
365 this.groupCoordinator = groupCoordinator;
368 private boolean isGroupCoordinator() {
369 return id.equals(groupCoordinator);
372 private void setStatus(Status status) {
373 LOG.fine("Entering " + status + " status in node " + id + "\n");
374 this.status = status;
375 for(ElectionListener listener : listeners) {
376 listener.onStateChange(this);
381 private void sendMessage(Message message) throws IOException {
382 LOG.fine("Sending message " + message + "\n");
383 socket.sendMessage(message);