Home Papers Reports Projects Code Fragments Dissertations Presentations Posters Proposals Lectures given Course notes
<< 7. A Distributed Scheme Interpreter9. Asynchronous implementaion of a Vote for Wait/Continue strategy in Borg >>

8. Thread Based Error Handling: Synchronous Voting on a Distributed Whiteboard

Werner Van Belle1 - werner@yellowcouch.org, werner.van.belle@gmail.com

1- Programming Technology Lab (PROG) Department of Computer Science (DINF) Vrije Universiteit Brussel (VUB); Pleinlaan 2; 1050 Brussel; Belgium

Abstract :  How can we deal with disconnects on a whiteboard given that the clients are written in a standard javaRMI fashion: sychronous calls. This excercise investigates the possibility of creating a vote/contiue/wait strategy. These excercies were used in 2000-2001; 2001-2002 and 2002-2003

Reference:  Werner Van Belle; Thread Based Error Handling: Synchronous Voting on a Distributed Whiteboard;
See also:
The exercise where we investigate asynchronous messaging in borg.


Info

Al hetgeen we tot nu toe gezien hebben ging ervan uit dat er weinig of geen fouten zouden optreden. De sockets lessen gingen ervan uit dat al de data mooi verzonden werd en mooi toekwam. Bij de RMI lessen gingen we ervan uit dat onze communicatie partner niet halverwege zijn execution het loodje aan maarten gaf (of dit een bestaande nederlandstalig uitdrukking is laat ik in het midden). We gingen er overal vanuit dat we het gedrag kende van de omgeving. In gedistribueerde omgevingen kan men hier niet vanuit gaan.

We zullen dit illustreren door de eerder geschreven whiteboard ter hand te pakken en lichtjes aan te passen, rekening houdend met een  conceptuele toevoeging van error-correctie gedrag op gebruikersniveau.

Opgave 0: Waar kan het foutlopen ?

Een vaak gemaakte fout is te veronderstellen dat uw client geen slechte intenties heeft en correct gedrag vertoont. Illustreer in de servercode waar we hier potentieel grote problemen kunnen door ondervinden. Neem de code van de whiteboard van vorige maal (deze staat online op de server). Onderzoek

• Welke soorten RemoteExceptions Java kan gooien.
• Op welke plaatsen in de code we geen rekening hebben gehouden met dergelijke excepties.

Whiteboard.java

import java.rmi.Remote; 
import java.rmi.RemoteException; 

public interface Whiteboard extends Remote 
{
   void printObject(Object o) throws RemoteException; 
   int join(Whiteboard client) throws RemoteException;
}

WhiteboardClientProtocol.java

import java.rmi.Remote; 
import java.rmi.RemoteException; 

public interface WhiteboardClientProtocol extends Remote 
{
    void printObject(Object o) throws RemoteException; 
    boolean vote(String text) throws RemoteException;
}

WhiteboardClient.java

import java.applet.Applet; 
import java.awt.Graphics; 
import java.rmi.Naming; 
import java.rmi.RemoteException; 
import java.rmi.server.UnicastRemoteObject; 

public class WhiteboardClient extends UnicastRemoteObject implements Whiteboard
{ 
   static Whiteboard repeater = null; 
   static int id;
   public WhiteboardClient() throws RemoteException 
     {	
	super();
     } 
   public void printObject(Object obj) throws RemoteException
     {
	System.out.println(obj);
     }
   
   public int join(Whiteboard obj) throws RemoteException
     {
	System.out.println("serious error");
	System.exit(2);
	return -1;
     }
   
   public static void main(String args[]) 
     { 
	try 
	  {
	     WhiteboardClient client=new WhiteboardClient();
	     repeater = (Whiteboard)Naming.lookup("//7.0.0.1/RepeatServer");
	     id=repeater.join(client);
	     while(true)
	       {
		  Thread.currentThread().sleep(5);
		  repeater.printObject("i am "+id);
	       }
	  }
	catch (Exception e)
	  {
	     System.out.println("Client exception: " + e.getMessage());
	     e.printStackTrace();
	  }
     }
}

WhiteboardServer.java

import java.rmi.Naming; 
import java.rmi.RemoteException; 
import java.rmi.server.UnicastRemoteObject; 

public class WhiteboardServer 
  extends UnicastRemoteObject 
  implements Whiteboard
{
   static int maxclients = 0; 
   static Whiteboard clients[];
   
   public WhiteboardServer() throws RemoteException 
     {
	super();
     } 
   public void printObject(Object obj) throws RemoteException
     {
	int i;
	for(i=0;i<maxclients;i++)
	  {
	     if (clients[i]!=null)
	       {
		  try 
		    {
		       clients[i].printObject(obj);
		    }
		  catch (Exception e)
		    {
		       System.out.println("client error... client removed..");
		       clients[i]=null;
		    }
	       }
	  }
     }
   
   public int join(Whiteboard who) throws RemoteException
     {
	int i;
	for(i=0;i<maxclients;i++)
	  {
	     if (clients[i]==null)
	       {
		  clients[i]=who;
		  return i;
	       }
	  }
	System.out.println("maximum number of clients reached...");
	return -1;
     }
   
   public static void main(String args[]) 
     {
	try 
	  {
	     clients=new Whiteboard[maxclients];
	     WhiteboardServer obj = new WhiteboardServer(); 
	     Naming.rebind("//7.0.0.1/RepeatServer", obj);
	     System.out.println("RepeatServer bound in registry");
	  } 
	catch (Exception e) 
	  { 
	     System.out.println("WhiteboardServer err: " + e.getMessage()); 
	     e.printStackTrace(); 
	  } 
     }
}

WhiteboardServerProtocol.java

import java.rmi.Remote; 
import java.rmi.RemoteException; 

public interface WhiteboardServerProtocol
 extends Remote 
{
    void printObject(Object o) throws RemoteException; 
    int join(WhiteboardClientProtocol client, String name) throws RemoteException;
}

Opgave 1: Add Reconnection Behaviour

Een andere fout die vaak optreed is het disconnecteren van processen. (dat deze processen op een Windows machine aan het draaien waren is meestal het geval, maar dit is natuurlijk moeilijker aan te tonen, en ver buiten de scope van het vak)

Hetgeen men in dergelijke situaties alleen maar kan doen is proberen te herconnecteren met dat proces en indien dit laatste mogelijk is, te synschoniseren met het proces. Beschrijf hoe we dit kunnen doen in de whiteboard. Implementeer dit.

Oplossingen

ReconnectingWhiteboard.java

import java.rmi.Remote; 
import java.rmi.RemoteException; 

public interface ReconnectingWhiteboard extends Remote 
{
   void printObject(Object o) throws RemoteException; 
   int join(Whiteboard client) throws RemoteException;
   int rejoin(Whiteboard client) throws RemoteException;
}

ReconnectingWhiteboardClient.java

import java.applet.Applet; 
import java.awt.Graphics; 
import java.rmi.Naming; 
import java.rmi.RemoteException; 
import java.rmi.server.UnicastRemoteObject; 
import java.util.*;

public class ReconnectingWhiteboardClient 
  extends UnicastRemoteObject 
  implements ReconnectingWhiteboardClientProtocol
{ 
   static ReconnectingWhiteboardServerProtocol repeater = null; 
   static int id;
   static LinkedList messages;
   static String name;
   static int state;
   final static int CONNECTED = 1;
   final static int DISCONNECTED =2;
   final static int NOT_YET_CONNECTED = 3;
   public ReconnectingWhiteboardClient(String n) throws RemoteException
     {	
	super();
	messages = new LinkedList();
	state=NOT_YET_CONNECTED;
	name = n;
     } 
   synchronized public void printObject(Object obj) throws RemoteException
     {
	messages.addLast(obj);
	System.out.println(obj);
     }
   public void kill() throws RemoteException
     {
	System.exit(0);
     }
   synchronized public void sendMessage(Object msg)
     {
	try
	  {
	     repeater.printObject(msg);
	  }
	catch (Exception e)
	  {
	     if (state == CONNECTED)
	       state = DISCONNECTED;
	     while(state != CONNECTED)
	       {
		  try 
		    {
		       repeater = (ReconnectingWhiteboardServerProtocol)Naming.lookup("//7.0.0.1/RepeatServer");
		       if (state==NOT_YET_CONNECTED)
			 id=repeater.join(name,this);
		       else 
			 repeater.rejoin(name,this,messages);
		       state = CONNECTED;
		    }
		  catch (Exception efd)
		    {  
		    }
	       }
	  }
     }
   
   public static void main(String args[]) 
     { 
	try 
	  {
	     ReconnectingWhiteboardClient client = new ReconnectingWhiteboardClient(args[0]);
	     int nr=0;
	     while(true)
	       {
		  Thread.currentThread().sleep(0);
		  client.sendMessage("i am "+id+":"+(nr++));
	       }
	  }
	catch (Exception e)
	  {
	     System.out.println("Client exception: " + e.getMessage());
	     e.printStackTrace();
	  }
     }
}

ReconnectingWhiteboardClientProtocol.java

import java.rmi.Remote; 
import java.rmi.RemoteException; 

public interface ReconnectingWhiteboardClientProtocol extends Remote
{
   void printObject(Object o) throws RemoteException;
   void kill() throws RemoteException;
}

ReconnectingWhiteboardServer.java

import java.rmi.Naming; 
import java.rmi.RemoteException; 
import java.rmi.server.UnicastRemoteObject; 
import java.util.*;

/**
 * The reconnecting whiteboard server has an extra function rejoin, shich is called
 * from the client when he rejoins. 
 * If the client rejoins we send back all the events which has been passed 
 * already.
 */
public class ReconnectingWhiteboardServer 
  extends UnicastRemoteObject
  implements ReconnectingWhiteboardServerProtocol
{
   static final int RECOVERING = 1;
   static final int REPEATING = 2;
   static final int EMPTY = 3;
   static final int CONNECTED = 4;
   static final int DISCONNECTED = 5;
   static final int NEW_MESSAGES = 6;
   static final int NO_NEW_MESSAGES = 7;
   static final int maxclients = 0;
   // the state of the server
   int state;
   // all messages which came in
   LinkedList messages;
   // the states of the clients
   ClientState clients[];
   
   public ReconnectingWhiteboardServer() throws RemoteException 
     {
	super();
	messages=new LinkedList();
	/**
	 * initialise client state array
	 */
	clients=new ClientState[maxclients];
	for(int i=0;i<maxclients;i++)
	  clients[i]=new ClientState(null,null,EMPTY);
	/**
	 * bind name global
	 */
	try 
	  {
	     Naming.rebind("//7.0.0.1/RepeatServer", this);
	     System.out.println("RepeatServer bound in registry");
	  } 
	catch (Exception e) 
	  { 
	     System.out.println("WhiteboardServer err: " + e.getMessage()); 
	     e.printStackTrace(); 
	  } 
	/**
	 * We place the server initally in recovering mode
	 * After 0 seconds we switch to REPEATING mode
	 * and update all clients.
	 */
	synchronized(this)
	  {
	     state=RECOVERING;
	  }
	try
	  {
	     Thread.currentThread().sleep(0);
	  }
	catch (Exception e)
	  {
	     System.out.println("Sleep interrupted");
	  }
	synchronized(this)
	  {
	     state=REPEATING;
	     updateClients();
	  }    
     } 
   synchronized public void printObject(Object obj) throws RemoteException
     {
	/**
	 * add message in the queue and update all clients
	 */
	messages.addLast(obj);
	updateClients();
     }
   
   synchronized private int addClient(String name, ReconnectingWhiteboardClientProtocol w)
     {
	/**
	 * adding a client = finding a free position
	 * an filling it with this information. The
	 * state of the client is initially CONNECTED.
	 */
	for(int i=0;i<maxclients;i++)
	  {
	     if (clients[i].state==EMPTY)
	       {
		  clients[i]=new ClientState(name, w,CONNECTED);
		  return i;
	       }
	  }
	System.out.println("maximum number of clients reached...");
	return -1;
     }
   
   synchronized public int findClient(String name)
     {
	for(int i=0;i<maxclients;i++)
	  if (name.compareTo(clients[i].name)==0) return i;
	return -1;
     }
   synchronized public boolean clientKnown(String name)
     {
	return findClient(name)>-1;
     }
   synchronized public int join(String name,ReconnectingWhiteboardClientProtocol who) throws RemoteException
     {
	/**
	 * A join can only be called from a client we don't know yet
	 */
	int result=-1;
	if (clientKnown(name))
	  return -1;
	if (state==REPEATING)
	  {
	     /**
	      * If the server is in repeating mode and we didn't
	      * know the client, we'll add the client,
	      * if already knew the client we return -1. 
	      * If we didn't knew the client, we have to update
	      * him.
	      */
	     result = addClient(name, who);
	     updateClients();
	  }
	else if (state==RECOVERING)
	  {
	     /**
	      * If we are recovering, we simply add the client,
	      * the update will be send when switching to
	      * REPEATING mode
	      */
	     result = addClient(name, who);
	  }
	else 
	  System.out.println("server unknown state "+state);
	return result;
     }
   
   synchronized public void rejoin(String name, ReconnectingWhiteboardClientProtocol who, LinkedList clientstate) throws RemoteException
     {
	if (state==REPEATING)
	  {
	     /**
	      * If the client rejoins and the server is repeating
	      * we update our view on the client state, thereby
	      * ignoring & warning for 'new' messages.
	      * If there are any new messages we kill the client.
	      */
	     if (!clientKnown(name)) 
	       addClient(name,who);
	     if (mergeState(name,clientstate,NO_NEW_MESSAGES)==NEW_MESSAGES)
	       killClient(name);
	  }
	if (state==RECOVERING)
	  {
	     /**
	      * If the server is recovering, we add the client,
	      * thereby allowing new messages.
	      */
	     addClient(name, who);
	     mergeState(name,clientstate,NEW_MESSAGES);
	  }
     }
   
   synchronized public void updateClients()
     {
	/**
	 * this method will loop over all the clients and send out 
	 * all new messages to the clients.
	 */
	Iterator message_iterator = messages.iterator();
	while(message_iterator.hasNext())
	  {
	     Object message = message_iterator.next();
	     for(int i=0;i<maxclients;i++)
	       {
		  ClientState client = clients[i];
		  if (client.state==CONNECTED && !client.hasreceived.contains(message))
		    {
		       try 
			 {
			    client.connectover.printObject(message);
			 }
		       catch (Exception e)
			 {
			    killClient(client.name);
			 }
		    }
	       }
	  }
     }
   
   synchronized private int mergeState(String who, LinkedList clientstate, int allow_new_messages)
     {
	/**
	 * loops over all messages and checks wether both lists are the same.
	 * only at the end of the list are new messages accepted.
	 */
	ClientState client = clients[findClient(who)];
	Iterator known = messages.iterator();
	Iterator unknown = clientstate.iterator();
	while(known.hasNext() && unknown.hasNext())
	  {
	     Object msg=known.next();
	     Object shouldbe=unknown.next();
	     if (msg.equals(shouldbe))
	       {
		  client.hasreceived.addLast(msg);
	       }
	     else
	       {
		  System.out.println("new state didn't match");
		  killClient(who);
	       }
	  }
	if (allow_new_messages==NEW_MESSAGES &&
	    unknown.hasNext())
	  {
	     while(unknown.hasNext())
	       messages.addLast(unknown.next());
	     return NEW_MESSAGES;
	  }
	return NO_NEW_MESSAGES;
     }
   
   synchronized private void killClient(String name)
     {
	ClientState client=clients[findClient(name)];
	if (client.state==CONNECTED)
	  {
	     try
	       {
		  client.connectover.kill();
	       }
	     catch (Exception e)
	       {
		  // ignore any possible exception, the
		  // client will be removed anyway
	       }
	  }
	client.state=DISCONNECTED;
     }
   public static void main(String args[]) 
     {
	try 
	  {
	     new ReconnectingWhiteboardServer();
	  }
	
	catch (Exception e)
	  {
	     System.out.println(e);
	     e.printStackTrace();
	  }
     }
}


class ClientState
{
   public int state;
   public String name;
   public LinkedList hasreceived;
   public ReconnectingWhiteboardClientProtocol connectover; 
   public ClientState(String n,ReconnectingWhiteboardClientProtocol connect, int st)
     {
	state=st;
	connectover=connect;
	hasreceived=new LinkedList();
	name=n;
     }
}

ReconnectingWhiteboardServerProtocol.java

import java.rmi.Remote; 
import java.rmi.RemoteException; 
import java.util.*;

public interface ReconnectingWhiteboardServerProtocol extends Remote 
{
   void printObject(Object o) throws RemoteException; 
   int join(String name, ReconnectingWhiteboardClientProtocol client) throws RemoteException;
   void rejoin(String name, ReconnectingWhiteboardClientProtocol client, LinkedList clientstate) throws RemoteException;
}

Opgave 2: Synchrone Vote for Continue/Wait


Netwerk-errors kunenn vaak niet automatisch opgelost worden. Daartoe willen we de bestaande whiteboard-implementatie aanpassen met een vote/continue strategie:

Telkens een geconnecteerde client disconnect zonder te disjoinen, zullen we al de andere gebruikers op de whiteboard vragen te stemmen of verdere communicatie mogelijk is. Als voldoende gebruikers akkoord zijn met het het verder gebruiken van de white-board wordt de gedisconnecteerde eenvoudigweg genegeerd. In het andere geval, zal de whiteboard alle communicatie opschorten tot de desbetreffende gedisconnecteerde terug joined.

1- Bedenk een oplossing en teken een message diagram.Wees accuraat en houd rekening met de beperkingen van de Java RMI technologie !

2- Wat gebeurt er als een client disconnect tijdens het voting process ? Hoe zal u dat opvangen ?

3- Implementeer deze vote-continue strategie op een synchrone manier.

Oplossing

VotingWhiteboardClient.java

import java.applet.Applet; 
import java.awt.Graphics; 
import java.rmi.Naming; 
import java.rmi.RemoteException; 
import java.rmi.server.UnicastRemoteObject; 
import java.io.*;

public class VotingWhiteboardClient 
    extends UnicastRemoteObject 
    implements WhiteboardClientProtocol
{ 
    static WhiteboardServerProtocol repeater = null; 
    static int id;
    public VotingWhiteboardClient() throws RemoteException 
    {	
	super();
    } 
    
    public void printObject(Object obj) throws RemoteException
    {
	System.out.println(obj);
    }
    
    static private String getText()
    {
	try
	    {
		BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
		String text = console.readLine();
		return text;
	    }
	catch (IOException e)
	    {
		System.out.println("An IO Exception occured... Crashing out");
		System.exit(0);
	    }
	return null;
    }
    
    public boolean vote(String text) throws RemoteException
    {
	System.out.print(text+" has disconnected, wait or continue. Enter w or c");
	text = getText();
	System.out.println(text);
	return text.compareTo("c")==0;
    }
    
    public static void main(String args[]) 
    { 
	try 
	    {
		VotingWhiteboardClient client=new VotingWhiteboardClient();
		repeater = (WhiteboardServerProtocol)Naming.lookup("//7.0.0.1/RepeatServer");
		System.out.print("Enter nick: ");
		String nick = getText();
		id=repeater.join(client,nick);
		while(true)
		    {
			Thread.currentThread().sleep(5);
			repeater.printObject("i am "+nick);
		    }
	    }
	catch (Exception e)
	    {
		System.out.println("Client exception: " + e.getMessage());
		e.printStackTrace();
	    }
    }
}

VotingWhiteboardServer.java

// nadelen van deze oplossing
// - een vote wordt slechts doorgestuurd naar de volgende 
//   in rij als de huidige geantwoord heeft
// - een client kan niet herconnecteren zolang het voting process
//   bezig is
// - concurrentieproblemen

import java.rmi.Naming; 
import java.rmi.RemoteException; 
import java.rmi.server.UnicastRemoteObject; 
import java.util.*;

public class VotingWhiteboardServer 
  extends UnicastRemoteObject 
  implements WhiteboardServerProtocol
{
    static Vector waitingList = new Vector();
    static int maxclients = 0; 
    static int voting = 0;
    static WhiteboardClientProtocol clients[];
    static String nicks[];
    
    public VotingWhiteboardServer() throws RemoteException 
    {
	super();
    } 
    
    synchronized public void printObject(Object obj) throws RemoteException
    {
	if (voting>0 | !waitingList.isEmpty())
	    return;
	for(int i = 0 ; i < maxclients ; i++ )
	    {
		if (clients[i]!=null)
		    {
			try 
			    {
				clients[i].printObject(obj);
			    }
			catch (Exception e)
			    {
				System.out.println("client error... client removed..");
				vote(i);
			    }
		    }
	    }
    }

    private void vote(int i)
    {
	String nick = nicks[i];
	int total = 0;
	int cont = 0;
	// remove client and start voting
	voting++;
	clients[i]=null;
	nicks[i]=null;
	// obtain all results
	for(int j = 0 ; j < maxclients ; j ++)
	    {
		if (clients[j]!=null)
		    {
			try
			    {
				if (clients[j].vote(nick))
				    cont++;
				total++;
			    }
			catch (Exception e)
			    {
				System.out.println("during voting client disconnected "+e);
				vote(j);
			    }
		    }
	    }
	// continue or wait for that nick ?
	System.out.println("continue votes = "+cont+"/"+total+" voting = "+voting);
	if (cont<=total/2)
	    waitingList.add(nick);
	voting--;
    }

    public int join(WhiteboardClientProtocol who, String name) throws RemoteException
    {
	if (voting>0)
	     {
		 System.out.println("voting... neglecting join request "+voting);
		 return -1;
	     }
	int i;
	for(i=0;i<maxclients;i++)
	    {
		if (clients[i]==null)
		    {
			clients[i]=who;
			nicks[i]=name;
			waitingList.remove(name);
			return i;
		    }
	    }
	System.out.println("maximum number of clients reached...");
	return -1;
    }
    
    public static void main(String args[]) 
    {
	try 
	    {
		clients = new WhiteboardClientProtocol[maxclients];
		nicks = new String[maxclients];
		VotingWhiteboardServer obj = new VotingWhiteboardServer(); 
		Naming.rebind("//7.0.0.1/RepeatServer", obj);
		System.out.println("RepeatServer bound in registry");
	    } 
	catch (Exception e) 
	    { 
		System.out.println("WhiteboardServer err: " + e.getMessage()); 
		e.printStackTrace(); 
	    } 
    }
}

http://werner.yellowcouch.org/
werner@yellowcouch.org