Home Papers Reports Projects Code Fragments Dissertations Presentations Posters Proposals Lectures given Course notes
<< 13. Petrinet Design of a Whiteboard Voting System15. Various Cases of Peer-to-peer distributed systems >>

14. Behavioral Replication

Werner Van Belle1 - werner@yellowcouch.org, werner.van.belle@gmail.com

1- Programming Technology Lab (PROG) Department of Computer Science (DINF) Vrije Universiteit Brussel (VUB); Pleinlaan 2; 1050 Brussel; Belgium

Abstract :  In this exercise we try to duplicate one calculation and present it transparently to the client. The aim being to create a robust fault tolerant system. This exercise was used in 2002-2003

Reference:  Werner Van Belle; Behavioral Replication;


Introduction

In this exercises we want to replicate the behavior of a server-process. The server process is a simple actor-based adder. Its code is included below. The client process is a simple actor which asks the adder to add two numbers. When the answer returns the adder will verify whether the answer is correct and request the adder to perform a new addition. To start these processes, a script './simple_start' is included. The script ./stop can be used to stop all actors.

Adder.java

import java.awt.*;
import java.awt.event.*;
import java.awt.image.*;
import java.rmi.*;
import java.rmi.server.*;

public class Adder
    extends Actor
{
    public Adder(String name, String args[]) throws RemoteException
    {
	super(name);
    }
    
    public void handleMessage(Message m)
    {
	AddRequest req = (AddRequest)m;
	sendMessage(req.comesfrom,
		    new AddAnswer(actor,req.a+req.b));
    }
    
    public static void main(String args[])
    {
	try
	    {
		new Adder(args[0],args);
	    }
	catch (Exception e)
	    {
		System.out.println(e);
		e.printStackTrace();
	    }
    }
}

AddRequest.java

public class AddRequest
  implements Message
{
    int a,b;
    String comesfrom;
    public AddRequest(String comes, int A, int B)
    {
	comesfrom = comes;
	a=A;
	b=B;
    }
}

AddAnswer.java

public class AddAnswer 
  implements Message
{
    int result;
    String comesfrom;
    public AddAnswer(String from, int r)
    {
	comesfrom = from;
	result = r;
    }
}

Client.java

import java.awt.*;
import java.rmi.*;
import java.rmi.server.*;
import java.util.Random;

public class Client
    extends Actor
{
    // location of adder process
    String adder;
    // the two number that should be added
    int a;
    int b;
    Random random;

    public Client(String name, String args[]) throws RemoteException
    {
	super(name);
	adder=args[1];
	random=new Random();
	System.out.println("Client "+actor+" started");
	sendNumbers();
    }
    
    public void sendNumbers()
    {
	a=random.nextInt();
	b=random.nextInt();
	// 'actor' is the name of this process
	sendMessage(adder,new AddRequest(actor,a,b));
    }

    public void handleMessage(Message m)
    {
	AddAnswer answer = (AddAnswer)m;
	System.out.println(actor+" addition of "+a+" and "+b+" is "+answer.result);
	if (answer.result!=a+b)
	    System.out.println("\n>>>>>>>>>>> This is wrong ! <<<<<<<<<<<<<<<<<<\n\n\n");
	else
	    sendNumbers();
    }
    
    public static void main(String args[])
    {
	try
	    {
		new Client(args[0],args);
	    }
	catch (Exception e)
	    {
		System.out.println(e);
		e.printStackTrace();
	    }
    }
}

simple_start

#!/bin/bash
rmiregistry &
java Adder //127.0.0.1/Adder &
java Client //127.0.0.1/Client1 //127.0.0.1/Adder &

Excercise

The aim of this exercise is to replicate the behavior of the adder process in such a way that still one process is offered to the 'outside' world. However, multiple local processes will calculate the same request. See picture



To this extent we will first modify the behavior of the Adder server such that it will announce itself to a proxy. Second we will create a proxy that will forward incoming requests to all the different adders.

1. Modify the adder such that it will send out a connection-request to the proxy when it starts. The proxy will be given as an extra argument on the command-line.

2. Create a proxy actor that redirects incoming messages.It should offers the following functionality
(a) when an AddRequest comes in, this message should be passed to all connected Adders
(b) when an AddAnswer arrives, it will be kept aside until all all answers from all adders have arrived
(c) make sure that it works with multiple clients
(d) make sure that it works when adders are added afterward

The script proxy_start should be used to start all involved components.

proxy_start

#!/bin/bash
echo STARTING REGISTRY
rmiregistry  2>/dev/null &
sleep 5
echo STARTING PROXY
java Proxy //7.0.0.1/Adder   2>proxy_log &
sleep 5
echo STARTING FIRST ADDER
java ModifiedAdder //7.0.0.1/Adder1 //7.0.0.1/Adder   2>/dev/null &
sleep 5;
echo STARTING FIRST CLIENT
java Client //7.0.0.1/Client1 //7.0.0.1/Adder   2>/dev/null &
sleep 5
echo STARTING SECOND ADDER
java ModifiedAdder //7.0.0.1/Adder2 //7.0.0.1/Adder   2>/dev/null &
sleep 5
echo STARTING THIRD ADDER
java ModifiedAdder //7.0.0.1/Adder3 //7.0.0.1/Adder   2>/dev/null &
sleep 5
echo STARTING SECOND CLIENT
java Client //7.0.0.1/Client2 //7.0.0.1/Adder   2>/dev/null &
sleep 5
echo STARTING FOURTH/FIFTH/SIXTH AND SEVENTH ADDER
java ModifiedAdder //7.0.0.1/Adder4 //7.0.0.1/Adder   2>/dev/null &
java ModifiedAdder //7.0.0.1/Adder5 //7.0.0.1/Adder   2>/dev/null &
java ModifiedAdder //7.0.0.1/Adder6 //7.0.0.1/Adder   2>/dev/null &
java ModifiedAdder //7.0.0.1/Adder7 //7.0.0.1/Adder   2>/dev/null &
sleep 5
echo STARTING THIRD/FOURTH/FIFTH CLIENT
java Client //7.0.0.1/Client3 //7.0.0.1/Adder   2>/dev/null &
java Client //7.0.0.1/Client4 //7.0.0.1/Adder   2>/dev/null &
java Client //7.0.0.1/Client5 //7.0.0.1/Adder   2>/dev/null &
echo ALL STARTED !!!

Solution

ConnectionRequest.java

public class ConnectionRequest
  implements Message
{
    String name;
    public ConnectionRequest(String comes)
    {
	name = comes;
    }
}

ModifiedAdder.java

import java.awt.*;
import java.awt.event.*;
import java.awt.image.*;
import java.rmi.*;
import java.rmi.server.*;
import java.util.*;

public class ModifiedAdder
    extends Actor
{
    String proxy;
    
    public ModifiedAdder(String name, String args[]) throws RemoteException
    {
	super(name);
	proxy=args[1];
	sendMessage(proxy,new ConnectionRequest(actor));
    }
    
    public void handleMessage(Message m)
    {
	AddRequest req = (AddRequest)m;
	// System.out.println("Adder "+actor+" activated");
	sendMessage(req.comesfrom,
		    new AddAnswer(actor, req.a+req.b));
    }
    
    public static void main(String args[])
    {
	try
	    {
		new ModifiedAdder(args[0],args);
	    }
	catch (Exception e)
	    {
		System.out.println(e);
		e.printStackTrace();
	    }
    }
}

Proxy.java

import java.awt.*;
import java.awt.event.*;
import java.awt.image.*;
import java.rmi.*;
import java.rmi.server.*;
import java.util.*;

class MessageInfo
{
    AddRequest req;
    HashMap client_answer;
    
    public MessageInfo(AddRequest r)
    {
	req = r;
	client_answer = new HashMap();
    }
    
    public int answer()
    {
	Iterator it = client_answer.values().iterator();
	while(it.hasNext())
	    {
		AddAnswer answer = (AddAnswer)it.next();
		return answer.result;
	    }
	return -1;
    }
    
    public boolean fullAnswer()
    {
	Iterator it = client_answer.values().iterator();
	while(it.hasNext())
	    {
		if (it.next()==null)
		    return false;
	    }
	return true;
    }

    public void sentTo(String client)
    {
	client_answer.put(client,null);
    }
    
    public boolean acceptAnswer(AddAnswer answer)
    {
	String client = answer.comesfrom;
	if (!client_answer.containsKey(client))
	    return false;
	if (client_answer.get(client)!=null)
	    return false;
	client_answer.put(client,answer);
	return true;
    }
}

public class Proxy
    extends Actor
{

    LinkedList queue;
    Vector connectedClients;
    
    public Proxy(String name, String args[]) throws RemoteException
    {
	super(name);
	queue=new LinkedList();
	connectedClients = new Vector();
    }
    
    private void sendToAllClients(AddRequest req)
    {
	if (connectedClients.isEmpty())
	    {
		System.out.println("Sorry, can't do, no active clients");
		System.exit(0);
	    }
	MessageInfo info = new MessageInfo(req);
	queue.addLast(info);
	Iterator it = connectedClients.iterator();
	while(it.hasNext())
	    {
		String client = (String)it.next();
		sendMessage(client,new AddRequest(actor,req.a,req.b));
		info.sentTo(client);
	    }
    }

    private void flushLoop()
    {
	// find out which messages have a full answer
	// which messages needs to be resend
	Iterator it = queue.iterator();
	// System.out.println(" queue size = "+queue.size());
	while (it.hasNext())
	    {
		MessageInfo info = (MessageInfo)it.next();
		if (info.fullAnswer())
		    {
			it.remove();
			sendMessage(info.req.comesfrom,
				    new AddAnswer(actor,info.answer()));
		    }
	    }
    }
    
    public void handleMessage(Message m)
    {
	if (m instanceof ConnectionRequest)
	    {
		ConnectionRequest cr=(ConnectionRequest)m;
		connectedClients.add(cr.name);
		System.out.println("Proxy has new adder "+cr.name);
	    }
	else if (m instanceof AddRequest)
	    {
		AddRequest req = (AddRequest)m;		
		// send through the request
		sendToAllClients(req);
	    }
	else if (m instanceof AddAnswer)
	    {
		AddAnswer answer = (AddAnswer)m;
		Iterator it = queue.iterator();
		// now we store the answer in the earliest sent message
		while(it.hasNext())
		    {
			MessageInfo info = (MessageInfo)it.next();
			if (info.acceptAnswer(answer))
			    break;
		    }
		// mark the adder which is still alive
		flushLoop();
	    }
	else
	    super.handleMessage(m);
	
    }
    
    public static void main(String args[])
    {
	try
	    {
		new Proxy(args[0],args);
	    }
	catch (Exception e)
	    {
		System.out.println(e);
		e.printStackTrace();
	    }
    }
}

http://werner.yellowcouch.org/
werner@yellowcouch.org