Saída de Código Suspeito para o Algoritmo de Bully

Eu estou escrevendo um programa para Bully Algorithm em Java
Aqui está o código:

package newbully; public class NewBully { public static void main(String[] args) { int total_processes = 6; RunningThread[] t = new RunningThread[total_processes]; for (int i = 0; i < total_processes; i++) { t[i] = new RunningThread(new Process(i+1, i+1), total_processes);//passing process id, priority, total no. of processes to running thread } try { Election.initialElection(t); } catch (Exception e) { System.out.println("Possibly you are using null references in array"); } for (int i = 0; i  Recovered from Crash"); //Find current co-ordinator. } synchronized private void pingCoOrdinator() { try { if (Election.isPingFlag()) { synchronized (Election.lock) { Election.lock.wait(); } } if (!Election.isElectionFlag()) { Election.setPingFlag(true); System.out.println("Process[" + this.process.getPid() + "]: Are you alive?"); Socket outgoing = new Socket(InetAddress.getLocalHost(), 12345); outgoing.close(); Election.setPingFlag(false); synchronized (Election.lock) { Election.lock.notifyAll(); } } } catch (Exception ex) { //Initiate Election System.out.println("process[" + this.process.getPid() + "]: -> Co-Ordinator is down\nInitiating Election"); Election.setElectionFlag(true); Election.setPingFlag(false); synchronized (Election.lock) { Election.lock.notifyAll(); } } } synchronized private void executeJob() { int temp = r.nextInt(20); for (int i = 0; i <= temp; i++) { try { Thread.sleep(700); } catch (InterruptedException e) { System.out.println("Error Executing Thread:" + process.getPid()); System.out.println(e.getMessage()); } } } synchronized private boolean sendMessage() { boolean response = false; int i = 0; try { if (Election.isMessageFlag()) { synchronized (Election.lock) { Election.lock.wait(); } } Election.setMessageFlag(true); if (Election.isElectionFlag()) { for (i = this.process.getPid() + 1; i  Process[" + i + "] responded to election message successfully"); electionMessage.close(); response = true; } catch (Exception ex) { System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] did not respond to election message"); } } } Election.setMessageFlag(false); synchronized (Election.lock) { Election.lock.notifyAll(); } } catch (Exception ex1) { System.out.println(ex1.getMessage()); } return response; } synchronized private void serve() { try { //service counter Socket incoming = null; ServerSocket s = new ServerSocket(12345); for (int counter = 0; counter < 10; counter++) { incoming = s.accept(); System.out.println("Process[" + this.process.getPid() + "]:Yes"); Scanner scan = new Scanner(incoming.getInputStream()); PrintWriter out = new PrintWriter(incoming.getOutputStream(), true); if (scan.hasNextLine()) { if (scan.nextLine().equals("Who is the co-ordinator?")) { System.out.print("Process[" + this.process.getPid() + "]:"); out.println(this.process); } } } //after serving 10 requests go down for random time this.process.setCoOrdinatorFlag(false); this.process.setDownflag(true); try { incoming.close(); s.close(); sock[this.process.getPid() - 1].close(); Thread.sleep((this.r.nextInt(10) + 1) * 1000000);//going down recovery(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } } catch (IOException ex) { System.out.println(ex.getMessage()); } } @Override public void run() { try { sock[this.process.getPid() - 1] = new ServerSocket(10000 + this.process.getPid()); } catch (IOException ex) { System.out.println(ex.getMessage()); } while (true) { if (process.isCoOrdinatorFlag()) { //serve other processes serve(); } else { while (true) { //Execute some task executeJob(); //Ping the co-ordinator pingCoOrdinator(); if (Election.isElectionFlag()) { if (!sendMessage()) {//elect self as co-ordinator System.out.println("New Co-Ordinator: Process[" + this.process.getPid() + "]"); this.process.setCoOrdinatorFlag(true); Election.setElectionFlag(false); break; } } } } } } } package newbully; public class Election { private static boolean pingFlag = false; private static boolean electionFlag = false; private static boolean messageFlag = false; public static final Object lock = new Object(); public static boolean isMessageFlag() { return messageFlag; } public static void setMessageFlag(boolean messageFlag) { Election.messageFlag = messageFlag; } public static boolean isPingFlag() { return pingFlag; } public static void setPingFlag(boolean pingFlag) { Election.pingFlag = pingFlag; } public static boolean isElectionFlag() { return electionFlag; } public static void setElectionFlag(boolean electionFlag) { Election.electionFlag = electionFlag; } public static void initialElection(RunningThread[] t) { Process temp = new Process(-1, -1); for (int i = 0; i < t.length; i++) { if (temp.getPriority() < t[i].getProcess().getPriority()) { temp = t[i].getProcess(); } } t[temp.pid - 1].getProcess().CoOrdinatorFlag = true; } } package newbully; public class Process { int pid; boolean downflag,CoOrdinatorFlag; public boolean isCoOrdinatorFlag() { return CoOrdinatorFlag; } public void setCoOrdinatorFlag(boolean isCoOrdinator) { this.CoOrdinatorFlag = isCoOrdinator; } int priority; public boolean isDownflag() { return downflag; } public void setDownflag(boolean downflag) { this.downflag = downflag; } public int getPid() { return pid; } public void setPid(int pid) { this.pid = pid; } public int getPriority() { return priority; } public void setPriority(int priority) { this.priority = priority; } public Process() { } public Process(int pid, int priority) { this.pid = pid; this.downflag = false; this.priority = priority; this.CoOrdinatorFlag = false; } } 

Aqui está a saída:

 //--When delay in executeJob() method is 100 Process[4]: Are you alive? Process[6]:Yes Process[4]: Are you alive? Process[6]:Yes Process[3]: Are you alive? Process[6]:Yes Process[5]: Are you alive? Process[6]:Yes Process[1]: Are you alive? Process[6]:Yes Process[4]: Are you alive? Process[6]:Yes Process[3]: Are you alive? Process[6]:Yes Process[3]: Are you alive? Process[6]:Yes Process[2]: Are you alive? Process[6]:Yes Process[5]: Are you alive? Process[6]:Yes Process[1]: Are you alive? process[1]: -> Co-Ordinator is down Initiating Election Process[1] -> Process[2] responded to election message successfully Process[1] -> Process[3] responded to election message successfully Process[1] -> Process[4] responded to election message successfully Process[1] -> Process[5] responded to election message successfully Process[1] -> Process[6] did not respond to election message Process[2] -> Process[3] responded to election message successfully Process[3] -> Process[4] responded to election message successfully Process[4] -> Process[5] responded to election message successfully Process[2] -> Process[4] responded to election message successfully Process[2] -> Process[5] responded to election message successfully Process[3] -> Process[5] responded to election message successfully Process[5] -> Process[6] did not respond to election message New Co-Ordinator: Process[5] New Co-Ordinator: Process[1] Address already in use: JVM_Bind Address already in use: JVM_Bind Address already in use: JVM_Bind //--When delay in executeJob() method is 700 Process[3]: Are you alive? Process[6]:Yes Process[5]: Are you alive? Process[6]:Yes Process[2]: Are you alive? Process[1]: Are you alive? Process[6]:Yes Process[6]:Yes Process[5]: Are you alive? Process[1]: Are you alive? Process[6]:Yes Process[6]:Yes Process[4]: Are you alive? Process[6]:Yes Process[3]: Are you alive? Process[6]:Yes Process[2]: Are you alive? Process[6]:Yes Process[1]: Are you alive? Process[6]:Yes Process[4]: Are you alive? process[4]: -> Co-Ordinator is down Initiating Election Process[4] -> Process[5] responded to election message successfully Process[4] -> Process[6] did not respond to election message Process[5] -> Process[6] did not respond to election message New Co-Ordinator: Process[5] Process[1]: Are you alive? Process[5]:Yes Process[1]: Are you alive? Process[5]:Yes Process[3]: Are you alive? Process[5]:Yes Process[2]: Are you alive? Process[5]:Yes Process[1]: Are you alive? Process[5]:Yes Process[4]: Are you alive? Process[5]:Yes Process[2]: Are you alive? Process[5]:Yes Process[4]: Are you alive? Process[5]:Yes Process[3]: Are you alive? Process[5]:Yes Process[3]: Are you alive? Process[5]:Yes Process[2]: Are you alive? process[2]: -> Co-Ordinator is down Initiating Election Process[2] -> Process[3] responded to election message successfully Process[2] -> Process[4] responded to election message successfully Process[2] -> Process[5] did not respond to election message Process[2] -> Process[6] did not respond to election message Process[3] -> Process[4] responded to election message successfully Process[3] -> Process[5] did not respond to election message Process[3] -> Process[6] did not respond to election message Process[1] -> Process[2] responded to election message successfully Process[1] -> Process[3] responded to election message successfully Process[1] -> Process[4] responded to election message successfully Process[1] -> Process[5] did not respond to election message Process[1] -> Process[6] did not respond to election message Process[2] -> Process[3] responded to election message successfully Process[2] -> Process[4] responded to election message successfully Process[2] -> Process[5] did not respond to election message Process[2] -> Process[6] did not respond to election message Process[4] -> Process[5] did not respond to election message Process[4] -> Process[6] did not respond to election message New Co-Ordinator: Process[4] Process[3]: Are you alive? Process[4]:Yes Process[3]: Are you alive? Process[4]:Yes Process[1]: Are you alive? Process[4]:Yes Process[2]: Are you alive? Process[4]:Yes Process[1]: Are you alive? Process[4]:Yes Process[2]: Are you alive? Process[4]:Yes Process[2]: Are you alive? Process[4]:Yes Process[2]: Are you alive? Process[4]:Yes Process[3]: Are you alive? Process[4]:Yes Process[1]: Are you alive? Process[4]:Yes Process[3]: Are you alive? process[3]: -> Co-Ordinator is down Initiating Election Process[3] -> Process[4] did not respond to election message Process[3] -> Process[5] did not respond to election message Process[3] -> Process[6] did not respond to election message New Co-Ordinator: Process[3] New Co-Ordinator: Process[2] Address already in use: JVM_Bind Address already in use: JVM_Bind Address already in use: JVM_Bind Address already in use: JVM_Bind Address already in use: JVM_Bind 

Por fim, começo a obter a exceção do Address already in use: JVM_Bin .
Além disso, se verificarmos com o mais recente coordenador eleito na apresentação antes da exceção, ele escolhe duas vezes antes de perguntar is co-ordinator alive?
Tenho a certeza de que, quando um coordenador morre, dou-lhe tempo suficiente para que não acorde entre os dois.
Quando eu dou extra dealy, o programa vai adiante, senão ele pára no meio.
Então, por que esse problema deve estar ocorrendo?

Eu encontrei o motivo da exceção
Está acontecendo porque se você olhar de perto na saída logo antes da mensagem de exceção, ele elegeu o coordenador 2ce.
Sempre que um Thread é eleito como coordenador, ele abre um ServerSocket na porta 12345.
Desde que aconteceu 2ce pode estar jogando exceção.
Mas eu não entendo … por que ele escolheu o 2ce ??

Sua mensagem de erro em ambos os casos é “Endereço já em uso: JVM_Bind”.

Esta mensagem é geralmente parte de um java.net.BindException que é lançado quando você tenta criar / abrir um Socket e a porta já está em uso. Nesse caso, você provavelmente está tentando abrir ou criar o mesmo soquete duas vezes.

Isso pode ocorrer porque, entre a abertura e o fechamento do soquete, uma exceção interrompe o socket.close () que está sendo chamado. Nesse caso, quando você cria um soquete para o líder eleitoral, mas porque o host está “travado”, uma exceção é lançada e, portanto, o close () nunca é chamado.

Eu acho que você precisa adicionar a linha

electionMessage.close (); para a cláusula catch desta parte da mensagem de envio.

 try { Socket electionMessage = new Socket(InetAddress.getLocalHost(), 10000 + i); System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] responded to election message successfully"); electionMessage.close(); response = true; } catch (Exception ex) { //Add close here System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] did not respond to election message"); } 

Além disso, eu recomendaria adicionar o próximo a todas as outras cláusulas relevantes de captura, caso seja em outro lugar que isso está acontecendo e é sempre uma boa prática para evitar problemas relacionados. Eu também recomendo especificar claramente quais exceções você quer pegar em cada lugar para que você não seja pego por outras armadilhas.

Espero que seja um bom ponto de partida.


Editar em resposta ao primeiro comentário

Eu acho que há um problema com a maneira como você está “travando” seus tópicos. Você está contando com uma bandeira booleana para lhe dizer que um determinado ponto no código foi atingido. Mas o sinalizador booleano não é controlado em um bloco sincronizado, nem com outros bloqueios no código. Assim, vários segmentos podem passar o bloqueio e causar events inesperados, como várias tentativas de abrir o mesmo soquete.

Você está usando o código com um object para um bloqueio

 if (Election.isMessageFlag()) { synchronized (Election.lock) { Election.lock.wait(); } } Election.setMessageFlag(true); if (Election.isElectionFlag()) { // Do Stuff // Open/close Sockets etc } Election.setMessageFlag(false); synchronized (Election.lock) { Election.lock.notifyAll(); } 

Isso é mais do que um thread pode passar a primeira declaração if antes que o sinalizador seja definido (na próxima linha!) Para fazer segmentos subseqüentes aguardar.

No entanto, se você usar um ReentrantLock, use um código como este:

 lock.lock(); // block until condition holds try { // Do Stuff // OPen CLose Sockets etc } finally { lock.unlock() } 

É claro que você ainda precisa definir algumas bandeiras para dizer se a eleição está ocorrendo, mas certifique-se de proteger o código com bloqueios reais, por exemplo, ReentrantLock, ou dentro de blocos sincronizados apropriados.

Espero que isto ajude

Depois de me referir a todos os comentários acima, estou postando o código correto, que é perfeito para que outros possam encaminhá-lo.
Qualquer sugestão de melhorias no código é bem vinda …

 package newbully; public class NewBully { public static void main(String[] args) { int total_processes = 6; RunningThread[] t = new RunningThread[total_processes]; for (int i = 0; i < total_processes; i++) { t[i] = new RunningThread(new Process(i+1, i+1), total_processes);//passing process id, priority, total no. of processes to running thread } try { Election.initialElection(t); } catch (NullPointerException e) { System.out.println(e.getMessage()); } for (int i = 0; i < total_processes; i++) { new Thread(t[i]).start();//start every thread } } } package newbully; import java.util.concurrent.locks.ReentrantLock; public class Election { public static ReentrantLock pingLock = new ReentrantLock(); public static ReentrantLock electionLock = new ReentrantLock(); private static boolean electionFlag = false; //By default no election is going on private static boolean pingFlag = true; //By default I am allowed to ping public static Process electionDetector; public static Process getElectionDetector() { return electionDetector; } public static void setElectionDetector(Process electionDetector) { Election.electionDetector = electionDetector; } public static boolean isPingFlag() { return pingFlag; } public static void setPingFlag(boolean pingFlag) { Election.pingFlag = pingFlag; } public static boolean isElectionFlag() { return electionFlag; } public static void setElectionFlag(boolean electionFlag) { Election.electionFlag = electionFlag; } public static void initialElection(RunningThread[] t) { Process temp = new Process(-1, -1); for (int i = 0; i < t.length; i++) { if (temp.getPriority() < t[i].getProcess().getPriority()) { temp = t[i].getProcess(); } } t[temp.pid - 1].getProcess().CoOrdinatorFlag = true; } } package newbully; import java.util.*; import java.io.*; import java.net.*; public class RunningThread implements Runnable { private Process process; private int total_processes; private static boolean messageFlag[]; ServerSocket[] sock; Random r; public Process getProcess() { return process; } public void setProcess(Process process) { this.process = process; } public RunningThread(Process process, int total_processes) { this.process = process; this.total_processes = total_processes; this.r = new Random(); this.sock = new ServerSocket[total_processes]; RunningThread.messageFlag = new boolean[total_processes]; for (int i = 0; i < total_processes; i++) { RunningThread.messageFlag[i] = false; } } synchronized private void recovery() { while (Election.isElectionFlag());//if election is going on then wait System.out.println("Process[" + this.process.getPid() + "]: -> Recovered from Crash"); //Find current co-ordinator. try { Election.pingLock.lock(); Election.setPingFlag(false); Socket outgoing = new Socket(InetAddress.getLocalHost(), 12345); Scanner scan = new Scanner(outgoing.getInputStream()); PrintWriter out = new PrintWriter(outgoing.getOutputStream(), true); System.out.println("Process[" + this.process.getPid() + "]:-> Who is the co-ordinator?"); out.println("Who is the co-ordinator?"); out.flush(); String pid = scan.nextLine(); String priority = scan.nextLine(); if (this.process.getPriority() > Integer.parseInt(priority)) { //Bully Condition out.println("Resign"); out.flush(); System.out.println("Process[" + this.process.getPid() + "]: Resign -> Process[" + pid + "]"); String resignStatus = scan.nextLine(); if (resignStatus.equals("Successfully Resigned")) { this.process.setCoOrdinatorFlag(true); sock[this.process.getPid() - 1] = new ServerSocket(10000 + this.process.getPid()); System.out.println("Process[" + this.process.getPid() + "]: -> Bullyed current co-ordinator Process[" + pid + "]"); } } else { out.println("Don't Resign"); out.flush(); } Election.pingLock.unlock(); return; } catch (IOException ex) { System.out.println(ex.getMessage()); } } synchronized private void pingCoOrdinator() { try { Election.pingLock.lock(); if (Election.isPingFlag()) { System.out.println("Process[" + this.process.getPid() + "]: Are you alive?"); Socket outgoing = new Socket(InetAddress.getLocalHost(), 12345); outgoing.close(); } } catch (Exception ex) { Election.setPingFlag(false); Election.setElectionFlag(true); Election.setElectionDetector(this.process); //Initiate Election System.out.println("process[" + this.process.getPid() + "]: -> Co-Ordinator is down\n" + "process[" + this.process.getPid() + "]: ->Initiating Election"); } finally { Election.pingLock.unlock(); } } private void executeJob() { int temp = r.nextInt(20); for (int i = 0; i <= temp; i++) { try { Thread.sleep((temp + 1) * 100); } catch (InterruptedException e) { System.out.println("Error Executing Thread:" + process.getPid()); System.out.println(e.getMessage()); } } } synchronized private boolean sendMessage() { boolean response = false; try { Election.electionLock.lock(); if (Election.isElectionFlag() && !RunningThread.isMessageFlag(this.process.getPid() - 1) && this.process.priority >= Election.getElectionDetector().getPriority()) { for (int i = this.process.getPid() + 1; i <= this.total_processes; i++) { try { Socket electionMessage = new Socket(InetAddress.getLocalHost(), 10000 + i); System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] responded to election message successfully"); electionMessage.close(); response = true; } catch (IOException ex) { System.out.println("Process[" + this.process.getPid() + "] -> Process[" + i + "] did not respond to election message"); } catch (Exception ex) { System.out.println(ex.getMessage()); } } this.setMessageFlag(true, this.process.getPid() - 1);//My message sending is done Election.electionLock.unlock(); return response; } else { throw new Exception(); } } catch (Exception ex1) { Election.electionLock.unlock(); return true; } } public static boolean isMessageFlag(int index) { return RunningThread.messageFlag[index]; } public static void setMessageFlag(boolean messageFlag, int index) { RunningThread.messageFlag[index] = messageFlag; } synchronized private void serve() { try { boolean done = false; Socket incoming = null; ServerSocket s = new ServerSocket(12345); Election.setPingFlag(true); int temp = this.r.nextInt(5) + 5;// min 5 requests and max 10 requests for (int counter = 0; counter < temp; counter++) { incoming = s.accept(); if (Election.isPingFlag()) { System.out.println("Process[" + this.process.getPid() + "]:Yes"); } Scanner scan = new Scanner(incoming.getInputStream()); PrintWriter out = new PrintWriter(incoming.getOutputStream(), true); while (scan.hasNextLine() && !done) { String line = scan.nextLine(); if (line.equals("Who is the co-ordinator?")) { System.out.println("Process[" + this.process.getPid() + "]:-> " + this.process.getPid()); out.println(this.process.getPid()); out.flush(); out.println(this.process.getPriority()); out.flush(); } else if (line.equals("Resign")) { this.process.setCoOrdinatorFlag(false); out.println("Successfully Resigned"); out.flush(); incoming.close(); s.close(); System.out.println("Process[" + this.process.getPid() + "]:-> Successfully Resigned"); return; } else if (line.equals("Don't Resign")) { done = true; } } } //after serving 5-10 requests go down for random time this.process.setCoOrdinatorFlag(false); this.process.setDownflag(true); try { incoming.close(); s.close(); sock[this.process.getPid() - 1].close(); Thread.sleep(15000);//(this.r.nextInt(10) + 1) * 10000);//going down recovery(); } catch (Exception e) { System.out.println(e.getMessage()); } } catch (IOException ex) { System.out.println(ex.getMessage()); } } @Override public void run() { try { sock[this.process.getPid() - 1] = new ServerSocket(10000 + this.process.getPid()); } catch (IOException ex) { System.out.println(ex.getMessage()); } while (true) { if (process.isCoOrdinatorFlag()) { //serve other processes serve(); } else { while (true) { //Execute some task executeJob(); //Ping the co-ordinator pingCoOrdinator(); //Do Election if (Election.isElectionFlag()) { if (!sendMessage()) {//elect self as co-ordinator Election.setElectionFlag(false);//Election is Done System.out.println("New Co-Ordinator: Process[" + this.process.getPid() + "]"); this.process.setCoOrdinatorFlag(true); for (int i = 0; i < total_processes; i++) { RunningThread.setMessageFlag(false, i); } break; } } } } } } } package newbully; public class Process { int pid; boolean downflag,CoOrdinatorFlag; public boolean isCoOrdinatorFlag() { return CoOrdinatorFlag; } public void setCoOrdinatorFlag(boolean isCoOrdinator) { this.CoOrdinatorFlag = isCoOrdinator; } int priority; public boolean isDownflag() { return downflag; } public void setDownflag(boolean downflag) { this.downflag = downflag; } public int getPid() { return pid; } public void setPid(int pid) { this.pid = pid; } public int getPriority() { return priority; } public void setPriority(int priority) { this.priority = priority; } public Process() { } public Process(int pid, int priority) { this.pid = pid; this.downflag = false; this.priority = priority; this.CoOrdinatorFlag = false; } }