Home | Papers | Reports | Projects | Code Fragments | Dissertations | Presentations | Posters | Proposals | Lectures given | Course notes |
<< 7. A Distributed Scheme Interpreter | 9. Asynchronous implementaion of a Vote for Wait/Continue strategy in Borg >> |
8. Thread Based Error Handling: Synchronous Voting on a Distributed WhiteboardWerner Van Belle1 - werner@yellowcouch.org, werner.van.belle@gmail.com Abstract : How can we deal with disconnects on a whiteboard given that the clients are written in a standard javaRMI fashion: sychronous calls. This excercise investigates the possibility of creating a vote/contiue/wait strategy. These excercies were used in 2000-2001; 2001-2002 and 2002-2003
Reference:
Werner Van Belle; Thread Based Error Handling: Synchronous Voting on a Distributed Whiteboard; |
Al hetgeen we tot nu toe gezien hebben ging ervan uit dat er weinig of geen fouten zouden optreden. De sockets lessen gingen ervan uit dat al de data mooi verzonden werd en mooi toekwam. Bij de RMI lessen gingen we ervan uit dat onze communicatie partner niet halverwege zijn execution het loodje aan maarten gaf (of dit een bestaande nederlandstalig uitdrukking is laat ik in het midden). We gingen er overal vanuit dat we het gedrag kende van de omgeving. In gedistribueerde omgevingen kan men hier niet vanuit gaan.
We
zullen dit illustreren door de eerder geschreven whiteboard ter hand
te pakken en lichtjes aan te passen, rekening houdend met een conceptuele
toevoeging van error-correctie gedrag op gebruikersniveau.
Een vaak gemaakte fout is te
veronderstellen dat uw client geen slechte intenties heeft en correct
gedrag vertoont. Illustreer in de servercode waar we hier
potentieel grote problemen kunnen door ondervinden. Neem de code van de
whiteboard van vorige maal (deze staat online op de server). Onderzoek
• Welke soorten RemoteExceptions Java kan gooien.
• Op welke plaatsen in de code we geen rekening hebben gehouden met
dergelijke excepties.
import java.rmi.Remote; import java.rmi.RemoteException; public interface Whiteboard extends Remote { void printObject(Object o) throws RemoteException; int join(Whiteboard client) throws RemoteException; }
import java.rmi.Remote; import java.rmi.RemoteException; public interface WhiteboardClientProtocol extends Remote { void printObject(Object o) throws RemoteException; boolean vote(String text) throws RemoteException; }
import java.applet.Applet; import java.awt.Graphics; import java.rmi.Naming; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; public class WhiteboardClient extends UnicastRemoteObject implements Whiteboard { static Whiteboard repeater = null; static int id; public WhiteboardClient() throws RemoteException { super(); } public void printObject(Object obj) throws RemoteException { System.out.println(obj); } public int join(Whiteboard obj) throws RemoteException { System.out.println("serious error"); System.exit(2); return -1; } public static void main(String args[]) { try { WhiteboardClient client=new WhiteboardClient(); repeater = (Whiteboard)Naming.lookup("//7.0.0.1/RepeatServer"); id=repeater.join(client); while(true) { Thread.currentThread().sleep(5); repeater.printObject("i am "+id); } } catch (Exception e) { System.out.println("Client exception: " + e.getMessage()); e.printStackTrace(); } } }
import java.rmi.Naming; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; public class WhiteboardServer extends UnicastRemoteObject implements Whiteboard { static int maxclients = 0; static Whiteboard clients[]; public WhiteboardServer() throws RemoteException { super(); } public void printObject(Object obj) throws RemoteException { int i; for(i=0;i<maxclients;i++) { if (clients[i]!=null) { try { clients[i].printObject(obj); } catch (Exception e) { System.out.println("client error... client removed.."); clients[i]=null; } } } } public int join(Whiteboard who) throws RemoteException { int i; for(i=0;i<maxclients;i++) { if (clients[i]==null) { clients[i]=who; return i; } } System.out.println("maximum number of clients reached..."); return -1; } public static void main(String args[]) { try { clients=new Whiteboard[maxclients]; WhiteboardServer obj = new WhiteboardServer(); Naming.rebind("//7.0.0.1/RepeatServer", obj); System.out.println("RepeatServer bound in registry"); } catch (Exception e) { System.out.println("WhiteboardServer err: " + e.getMessage()); e.printStackTrace(); } } }
import java.rmi.Remote; import java.rmi.RemoteException; public interface WhiteboardServerProtocol extends Remote { void printObject(Object o) throws RemoteException; int join(WhiteboardClientProtocol client, String name) throws RemoteException; }
Een andere fout die
vaak optreed is het disconnecteren van processen. (dat deze processen
op een Windows machine aan het draaien waren is meestal het geval,
maar dit is natuurlijk moeilijker aan te tonen, en ver buiten de
scope van het vak)
Hetgeen men in dergelijke situaties
alleen maar kan doen is proberen te herconnecteren met dat proces en
indien dit laatste mogelijk is, te synschoniseren met het proces.
Beschrijf hoe we dit kunnen doen in de whiteboard. Implementeer dit.
import java.rmi.Remote; import java.rmi.RemoteException; public interface ReconnectingWhiteboard extends Remote { void printObject(Object o) throws RemoteException; int join(Whiteboard client) throws RemoteException; int rejoin(Whiteboard client) throws RemoteException; }
import java.applet.Applet; import java.awt.Graphics; import java.rmi.Naming; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; import java.util.*; public class ReconnectingWhiteboardClient extends UnicastRemoteObject implements ReconnectingWhiteboardClientProtocol { static ReconnectingWhiteboardServerProtocol repeater = null; static int id; static LinkedList messages; static String name; static int state; final static int CONNECTED = 1; final static int DISCONNECTED =2; final static int NOT_YET_CONNECTED = 3; public ReconnectingWhiteboardClient(String n) throws RemoteException { super(); messages = new LinkedList(); state=NOT_YET_CONNECTED; name = n; } synchronized public void printObject(Object obj) throws RemoteException { messages.addLast(obj); System.out.println(obj); } public void kill() throws RemoteException { System.exit(0); } synchronized public void sendMessage(Object msg) { try { repeater.printObject(msg); } catch (Exception e) { if (state == CONNECTED) state = DISCONNECTED; while(state != CONNECTED) { try { repeater = (ReconnectingWhiteboardServerProtocol)Naming.lookup("//7.0.0.1/RepeatServer"); if (state==NOT_YET_CONNECTED) id=repeater.join(name,this); else repeater.rejoin(name,this,messages); state = CONNECTED; } catch (Exception efd) { } } } } public static void main(String args[]) { try { ReconnectingWhiteboardClient client = new ReconnectingWhiteboardClient(args[0]); int nr=0; while(true) { Thread.currentThread().sleep(0); client.sendMessage("i am "+id+":"+(nr++)); } } catch (Exception e) { System.out.println("Client exception: " + e.getMessage()); e.printStackTrace(); } } }
import java.rmi.Remote; import java.rmi.RemoteException; public interface ReconnectingWhiteboardClientProtocol extends Remote { void printObject(Object o) throws RemoteException; void kill() throws RemoteException; }
import java.rmi.Naming; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; import java.util.*; /** * The reconnecting whiteboard server has an extra function rejoin, shich is called * from the client when he rejoins. * If the client rejoins we send back all the events which has been passed * already. */ public class ReconnectingWhiteboardServer extends UnicastRemoteObject implements ReconnectingWhiteboardServerProtocol { static final int RECOVERING = 1; static final int REPEATING = 2; static final int EMPTY = 3; static final int CONNECTED = 4; static final int DISCONNECTED = 5; static final int NEW_MESSAGES = 6; static final int NO_NEW_MESSAGES = 7; static final int maxclients = 0; // the state of the server int state; // all messages which came in LinkedList messages; // the states of the clients ClientState clients[]; public ReconnectingWhiteboardServer() throws RemoteException { super(); messages=new LinkedList(); /** * initialise client state array */ clients=new ClientState[maxclients]; for(int i=0;i<maxclients;i++) clients[i]=new ClientState(null,null,EMPTY); /** * bind name global */ try { Naming.rebind("//7.0.0.1/RepeatServer", this); System.out.println("RepeatServer bound in registry"); } catch (Exception e) { System.out.println("WhiteboardServer err: " + e.getMessage()); e.printStackTrace(); } /** * We place the server initally in recovering mode * After 0 seconds we switch to REPEATING mode * and update all clients. */ synchronized(this) { state=RECOVERING; } try { Thread.currentThread().sleep(0); } catch (Exception e) { System.out.println("Sleep interrupted"); } synchronized(this) { state=REPEATING; updateClients(); } } synchronized public void printObject(Object obj) throws RemoteException { /** * add message in the queue and update all clients */ messages.addLast(obj); updateClients(); } synchronized private int addClient(String name, ReconnectingWhiteboardClientProtocol w) { /** * adding a client = finding a free position * an filling it with this information. The * state of the client is initially CONNECTED. */ for(int i=0;i<maxclients;i++) { if (clients[i].state==EMPTY) { clients[i]=new ClientState(name, w,CONNECTED); return i; } } System.out.println("maximum number of clients reached..."); return -1; } synchronized public int findClient(String name) { for(int i=0;i<maxclients;i++) if (name.compareTo(clients[i].name)==0) return i; return -1; } synchronized public boolean clientKnown(String name) { return findClient(name)>-1; } synchronized public int join(String name,ReconnectingWhiteboardClientProtocol who) throws RemoteException { /** * A join can only be called from a client we don't know yet */ int result=-1; if (clientKnown(name)) return -1; if (state==REPEATING) { /** * If the server is in repeating mode and we didn't * know the client, we'll add the client, * if already knew the client we return -1. * If we didn't knew the client, we have to update * him. */ result = addClient(name, who); updateClients(); } else if (state==RECOVERING) { /** * If we are recovering, we simply add the client, * the update will be send when switching to * REPEATING mode */ result = addClient(name, who); } else System.out.println("server unknown state "+state); return result; } synchronized public void rejoin(String name, ReconnectingWhiteboardClientProtocol who, LinkedList clientstate) throws RemoteException { if (state==REPEATING) { /** * If the client rejoins and the server is repeating * we update our view on the client state, thereby * ignoring & warning for 'new' messages. * If there are any new messages we kill the client. */ if (!clientKnown(name)) addClient(name,who); if (mergeState(name,clientstate,NO_NEW_MESSAGES)==NEW_MESSAGES) killClient(name); } if (state==RECOVERING) { /** * If the server is recovering, we add the client, * thereby allowing new messages. */ addClient(name, who); mergeState(name,clientstate,NEW_MESSAGES); } } synchronized public void updateClients() { /** * this method will loop over all the clients and send out * all new messages to the clients. */ Iterator message_iterator = messages.iterator(); while(message_iterator.hasNext()) { Object message = message_iterator.next(); for(int i=0;i<maxclients;i++) { ClientState client = clients[i]; if (client.state==CONNECTED && !client.hasreceived.contains(message)) { try { client.connectover.printObject(message); } catch (Exception e) { killClient(client.name); } } } } } synchronized private int mergeState(String who, LinkedList clientstate, int allow_new_messages) { /** * loops over all messages and checks wether both lists are the same. * only at the end of the list are new messages accepted. */ ClientState client = clients[findClient(who)]; Iterator known = messages.iterator(); Iterator unknown = clientstate.iterator(); while(known.hasNext() && unknown.hasNext()) { Object msg=known.next(); Object shouldbe=unknown.next(); if (msg.equals(shouldbe)) { client.hasreceived.addLast(msg); } else { System.out.println("new state didn't match"); killClient(who); } } if (allow_new_messages==NEW_MESSAGES && unknown.hasNext()) { while(unknown.hasNext()) messages.addLast(unknown.next()); return NEW_MESSAGES; } return NO_NEW_MESSAGES; } synchronized private void killClient(String name) { ClientState client=clients[findClient(name)]; if (client.state==CONNECTED) { try { client.connectover.kill(); } catch (Exception e) { // ignore any possible exception, the // client will be removed anyway } } client.state=DISCONNECTED; } public static void main(String args[]) { try { new ReconnectingWhiteboardServer(); } catch (Exception e) { System.out.println(e); e.printStackTrace(); } } } class ClientState { public int state; public String name; public LinkedList hasreceived; public ReconnectingWhiteboardClientProtocol connectover; public ClientState(String n,ReconnectingWhiteboardClientProtocol connect, int st) { state=st; connectover=connect; hasreceived=new LinkedList(); name=n; } }
import java.rmi.Remote; import java.rmi.RemoteException; import java.util.*; public interface ReconnectingWhiteboardServerProtocol extends Remote { void printObject(Object o) throws RemoteException; int join(String name, ReconnectingWhiteboardClientProtocol client) throws RemoteException; void rejoin(String name, ReconnectingWhiteboardClientProtocol client, LinkedList clientstate) throws RemoteException; }
Netwerk-errors kunenn vaak niet automatisch opgelost worden. Daartoe
willen we de bestaande whiteboard-implementatie aanpassen met een
vote/continue strategie:
Telkens een geconnecteerde client disconnect zonder te disjoinen,
zullen we al de andere gebruikers op de whiteboard vragen te stemmen of
verdere communicatie mogelijk is. Als voldoende gebruikers akkoord zijn
met het het verder gebruiken van de white-board wordt de
gedisconnecteerde eenvoudigweg genegeerd. In het andere geval, zal de
whiteboard alle communicatie opschorten tot de desbetreffende
gedisconnecteerde terug joined.
1- Bedenk een oplossing en teken een message diagram.Wees accuraat en houd rekening met de beperkingen van de Java RMI technologie !
2- Wat gebeurt er als een client
disconnect tijdens het voting process ? Hoe zal u dat opvangen ?
3- Implementeer deze vote-continue
strategie op een synchrone manier.
import java.applet.Applet; import java.awt.Graphics; import java.rmi.Naming; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; import java.io.*; public class VotingWhiteboardClient extends UnicastRemoteObject implements WhiteboardClientProtocol { static WhiteboardServerProtocol repeater = null; static int id; public VotingWhiteboardClient() throws RemoteException { super(); } public void printObject(Object obj) throws RemoteException { System.out.println(obj); } static private String getText() { try { BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); String text = console.readLine(); return text; } catch (IOException e) { System.out.println("An IO Exception occured... Crashing out"); System.exit(0); } return null; } public boolean vote(String text) throws RemoteException { System.out.print(text+" has disconnected, wait or continue. Enter w or c"); text = getText(); System.out.println(text); return text.compareTo("c")==0; } public static void main(String args[]) { try { VotingWhiteboardClient client=new VotingWhiteboardClient(); repeater = (WhiteboardServerProtocol)Naming.lookup("//7.0.0.1/RepeatServer"); System.out.print("Enter nick: "); String nick = getText(); id=repeater.join(client,nick); while(true) { Thread.currentThread().sleep(5); repeater.printObject("i am "+nick); } } catch (Exception e) { System.out.println("Client exception: " + e.getMessage()); e.printStackTrace(); } } }
// nadelen van deze oplossing // - een vote wordt slechts doorgestuurd naar de volgende // in rij als de huidige geantwoord heeft // - een client kan niet herconnecteren zolang het voting process // bezig is // - concurrentieproblemen import java.rmi.Naming; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; import java.util.*; public class VotingWhiteboardServer extends UnicastRemoteObject implements WhiteboardServerProtocol { static Vector waitingList = new Vector(); static int maxclients = 0; static int voting = 0; static WhiteboardClientProtocol clients[]; static String nicks[]; public VotingWhiteboardServer() throws RemoteException { super(); } synchronized public void printObject(Object obj) throws RemoteException { if (voting>0 | !waitingList.isEmpty()) return; for(int i = 0 ; i < maxclients ; i++ ) { if (clients[i]!=null) { try { clients[i].printObject(obj); } catch (Exception e) { System.out.println("client error... client removed.."); vote(i); } } } } private void vote(int i) { String nick = nicks[i]; int total = 0; int cont = 0; // remove client and start voting voting++; clients[i]=null; nicks[i]=null; // obtain all results for(int j = 0 ; j < maxclients ; j ++) { if (clients[j]!=null) { try { if (clients[j].vote(nick)) cont++; total++; } catch (Exception e) { System.out.println("during voting client disconnected "+e); vote(j); } } } // continue or wait for that nick ? System.out.println("continue votes = "+cont+"/"+total+" voting = "+voting); if (cont<=total/2) waitingList.add(nick); voting--; } public int join(WhiteboardClientProtocol who, String name) throws RemoteException { if (voting>0) { System.out.println("voting... neglecting join request "+voting); return -1; } int i; for(i=0;i<maxclients;i++) { if (clients[i]==null) { clients[i]=who; nicks[i]=name; waitingList.remove(name); return i; } } System.out.println("maximum number of clients reached..."); return -1; } public static void main(String args[]) { try { clients = new WhiteboardClientProtocol[maxclients]; nicks = new String[maxclients]; VotingWhiteboardServer obj = new VotingWhiteboardServer(); Naming.rebind("//7.0.0.1/RepeatServer", obj); System.out.println("RepeatServer bound in registry"); } catch (Exception e) { System.out.println("WhiteboardServer err: " + e.getMessage()); e.printStackTrace(); } } }
http://werner.yellowcouch.org/ werner@yellowcouch.org |