Home Papers Reports Projects Code Fragments Dissertations Presentations Posters Proposals Lectures given Course notes
<< 10. Concurrent Processes and Transactions12. Petrinets >>

11. Asynchronous Fractal Calculation

Werner Van Belle1 - werner@yellowcouch.org, werner.van.belle@gmail.com

1- Programming Technology Lab (PROG) Department of Computer Science (DINF) Vrije Universiteit Brussel (VUB); Pleinlaan 2; 1050 Brussel; Belgium

Abstract :  In this excercise we convert a julia-set calculator to work distributed. This lecture was given in 2001-2002 and 2002-2003

Reference:  Werner Van Belle; Asynchronous Fractal Calculation;


Information

In the previous exercises we have seen how thread based systems can be used to implement distributed programs. We will now investigate the possibilities of event-based systems for this purpose. Event based systems have all kinds of name: event based system, actor system, component system, java beans, and so on..

An event based system is a system in which events are passed between different processes. Every process can handle one event at a time. An event is not a function call, an answer is not necessarily returned when it has been handled. After sending an event to another process, the program simply continues: it does not wait. In general, an event based system can be compared to a mailbox/mail system.

Example

By extending the 'actor' class we can create a process that can receive and send messages. To send a message to another process, sendMessage should be used. To receive a message we must override the handleMessage routine. The code below shows how this works:

FractalCalcer.java

import java.rmi.*;
import java.lang.String;
import java.awt.*;
import java.awt.event.*;
import java.awt.image.*;

/**
 * calculates a julia set for the given coordinates and sends an answer
 * back
 */

public class FractalCalcer
  extends Actor
{ 
   Color colorscheme[];
   int XS = 0;
   int YS = 0;
   int PREC = 7;
   
   public FractalCalcer(String name) throws RemoteException
     {
	super(name);
	colorscheme=new Color[PREC+1];
	for (int i=0; i<PREC; i++)
	  {
	     int phase=5*i/PREC;
	     colorscheme[i]= new Color((phase+0)%6,phase,(phase+0)%6);
	  }
	colorscheme[PREC]=Color.black;
     }
   
   void drawMandleBrot(BufferedImage g, int w, int h)
     { 
	double x, y, t, cx, cy;
	int k;
	
	for (double i=-2; i<2; i+=4.0/w)
	  for (double j=-2; j<2; j+=4.0/h)
	    {
	       x=0; y=0;
	       cx = i; cy=j;
	       for (k=0; k<PREC; k++)
		 {
		    t = x*x-y*y+cx;
		    y = 2*x*y+cy;
		    x = t;
		    if (x*x+y*y>=4) break;
		 }
	       g.setRGB((int)((i+2)*w/4), (int)((j+2)*h/4),colorscheme[k].getRGB());
	    }
     }
   
   void drawJuliaBSM(BufferedImage g, double cx, double cy, int w, int h)
     { 
	double x, y, t;
	int k;
	for (double i=-2; i<2; i+=4.0/w)
	  for (double j=-2; j<2; j+=4.0/h)
	    {
	       x = i; y=j;
	       for (k=0; k<PREC; k++)
		 {
		    t = x*x-y*y+cx;
		    y = 2*x*y+cy;
		    x = t;
		    if (x*x+y*y >= 4) break;
		 }
	       g.setRGB((int)((i+2)*w/4), (int)((j+2)*h/4),colorscheme[k].getRGB());
	    }
     }
  
   public void handleMessage(Message m)
     {
	if (m instanceof CalcRequest)
	  {
	     // if it is a calcrequest we calculate the juliaset
	     CalcRequest cr=(CalcRequest)m;
	     BufferedImage img;
	     img=new BufferedImage(XS, YS, BufferedImage.TYPE_INT_RGB);
	      drawJuliaBSM(img, 0, 0.8, XS, YS);
	     //drawMandleBrot(img, XS, YS);
	     // and return the answer
	     CalcRequest request=(CalcRequest)m;
	     sendMessage(request.returnTo,new ImageMessage(img,XS,YS));
	  }
	else
	  super.handleMessage(m);
     }
   
   public static void main(String args[])
     {
	try
	  {
	     new FractalCalcer(args[0]);
	  }
	catch (Exception e)
	  {
	     System.out.println(e);
	     e.printStackTrace();
	  }
     }
}

ImageShower.java

import java.awt.*;
import java.awt.event.*;
import java.awt.image.*;
import java.rmi.*;
import java.rmi.server.*;

class FractalBufferedImage
  extends Canvas
{ 
   BufferedImage image;
   int XS=0;
   int YS=0;
   
   FractalBufferedImage()
     {
	Frame f = new Frame();
	f.setSize(XS,YS);
	f.add(this);
	f.show();
     }
   
   public void setImage(BufferedImage img)
     {
	image=img;
	repaint(0);
     }
   
   public void paint(Graphics g)
     {
	if (image!=null)
	  g.drawImage(image,0,0,this);
     }
}

public class FractalServer
  extends Actor
{
   protected FractalBufferedImage painter;
   protected String client;
   public FractalServer(String name, String args[]) throws RemoteException
     {
	super(name);
	painter=new FractalBufferedImage();
	client=args[1];
	// send a calc-request to the first client
	sendMessage(client,new CalcRequest(name));
     }
   
   public void handleMessage(Message m)
     {
	if (m instanceof ImageMessage)
	  {
	     ImageMessage im=(ImageMessage)m;
	     painter.setImage(im.convertToBufferedImage());
	  }
	else 
	  super.handleMessage(m);
     }
   
   public static void main(String args[])
     {
	try
	  {
	     new FractalServer(args[0],args);
	  }
	catch (Exception e)
	  {
	     System.out.println(e);
	     e.printStackTrace();
	  }
     }
}

Setting it up

To start these two processes:

    javac *.java
    rmic Actor
    rmiregistry &
    java FractalServer //127.0.0.1/Calcer &
    java FractalServer //127.0.0.1/Server  //127.0.0.1/Calcer &

Exercise 1: Tiled Calculation

Calculating such a fractal (used to) take(s) a long time. Therefore we want to calculate it using multiple computers.
• Explain how you would do this.
• Implement this using the actor model presented above.

Exercise 2: Animation

We want to visualize an animating Julia set as fast as possible. We do this by modifying the cx parameter over time as follows cx(t)=-1+0.02*t. Calculating the Julia-sets is done by multiple computers.

Extra Question

Some peer to peer messaging implementations offer only one incoming queue per actor. Others offer multiple queues, why would such a feature be useful ?

Solutions

Actor.java

import java.rmi.*;
import java.rmi.server.*;
import java.lang.reflect.*;
import java.util.*;

class MessageQueue
{
   private boolean iswaiting=false;
   private Message busywith=null;
   private LinkedList ll = new LinkedList();
   
   /**
    * Creates a messagequeue, which can be used to implement a synchronous
    * receive at consumer side and an asynchronous send ad producer side. In
    * other words. The push is asynchrounous and puts a message in the Q,
    * while the pop blocks if there is no message available.
    */
   public MessageQueue()
     {
     }
   
   public synchronized void push(Message o)
     {
        ll.addLast(o);
        notify();
     }
   
   /**
    * Blocking pop to retrieve the next message on the Q. If the Q is empty
    * this method will block until a message is pushed upon the Q. This method
    * is used to synchronize two threads to each other using a consumer
    * producer like approach.
    */
   public synchronized Message pop()
     {
        try
          {
             if (ll.isEmpty())
               {
                  iswaiting=true;
                  busywith=null;
                  wait();
                  iswaiting=false;
               }
             busywith = (Message)ll.getFirst();
             ll.removeFirst();
             return busywith;
          }
        catch(Exception e)
          {
             e.printStackTrace();
          }
        return null;
     }
}


public class Actor
  extends UnicastRemoteObject
  implements ActorInterface, Runnable, Remote
{
   protected String actor;
   private MessageQueue messages;
   public Actor(String name) throws RemoteException
     {
        try
          {
             actor=name;
             Naming.bind(name,this);
             messages=new MessageQueue();
          }
        catch (Exception e)
          {
             System.out.println(e);
             e.printStackTrace();
          }
        new Thread(this).start();
     }
   public void sendMessage(String target, Message m)
     {
        try
          {
             Remote rr=Naming.lookup(target);
             System.out.println(actor+" sends message");
             ActorInterface r = (ActorInterface)rr;
             r.receiveMessage(m);
          }
        catch (Exception e)
          {
             System.out.println(e);
             e.printStackTrace();
          }
     }
   public void receiveMessage(Message m) throws RemoteException
     {
        try
          {
             System.out.println(actor+" received message");
             messages.push(m);
          }
        catch (Exception e)
          {
             System.out.println(e);
             e.printStackTrace();
          }
     }
   public void run()
     {
        while(true)
          {
             Message m=messages.pop();
             try
               {
                  System.out.println(actor+" starts handling message");
                  handleMessage(m);
                  System.out.println(actor+" stops handling message");
               }
             catch (Exception e)
               {
                  System.out.println("While handling message an error occured" + e);
                  e.printStackTrace();
               }
          }
     }
   public void handleMessage(Message m)
     {
        System.out.println("Please define a handleMessage(Message m) in your subclass");
     }
}

ActorInterface.java

import java.rmi.*;

public interface ActorInterface
  extends java.rmi.Remote
{
   public void receiveMessage(Message m) throws RemoteException;
}

AnimatedFractalCalcer.java

import java.rmi.*;
import java.lang.String;
import java.awt.*;
import java.awt.event.*;
import java.awt.image.*;

/**
 * calculates a julia set for the given coordinates and sends an answer
 * back
 */

public class AnimatedFractalCalcer
  extends Actor
{ 
   Color colorscheme[];
   int XS = 0;
   int YS = 0;
   int PREC = 7;
   
   public AnimatedFractalCalcer(String name) throws RemoteException
     {
	super(name);
	colorscheme=new Color[PREC+1];
	for (int i=0; i<PREC; i++)
	  {
	     int phase=5*i/PREC;
	     colorscheme[i]= new Color((phase+0)%6,phase,(phase+0)%6);
	  }
	colorscheme[PREC]=Color.black;
     }
   
   void drawMandleBrot(BufferedImage g, int w, int h)
     { 
	double x, y, t, cx, cy;
	int k;
	
	for (double i=-2; i<2; i+=4.0/w)
	  for (double j=-2; j<2; j+=4.0/h)
	    {
	       x=0; y=0;
	       cx = i; cy=j;
	       for (k=0; k<PREC; k++)
		 {
		    t = x*x-y*y+cx;
		    y = 2*x*y+cy;
		    x = t;
		    if (x*x+y*y>=4) break;
		 }
	       g.setRGB((int)((i+2)*w/4), (int)((j+2)*h/4),colorscheme[k].getRGB());
	    }
     }
   
   void drawJuliaBSM(BufferedImage g, double cx, double cy, int w, int h)
     { 
	double x, y, t;
	int k;
	for (double i=-2; i<2; i+=4.0/w)
	  for (double j=-2; j<2; j+=4.0/h)
	    {
	       x = i; y=j;
	       for (k=0; k<PREC; k++)
		 {
		    t = x*x-y*y+cx;
		    y = 2*x*y+cy;
		    x = t;
		    if (x*x+y*y >= 4) break;
		 }
	       g.setRGB((int)((i+2)*w/4), (int)((j+2)*h/4),colorscheme[k].getRGB());
	    }
     }
  
   public void handleMessage(Message m)
     {
	if (m instanceof JuliaRequest)
	  {
	     // if it is a calcrequest we calculate the juliaset
	     JuliaRequest cr=(JuliaRequest)m;
	     BufferedImage img;
	     img=new BufferedImage(XS, YS, BufferedImage.TYPE_INT_RGB);
	     drawJuliaBSM(img, cr.cx, cr.cy, XS, YS);
	     sendMessage(cr.returnTo,new ImageMessage(img,XS,YS,actor,cr.id));
	  }
	else
	  super.handleMessage(m);
     }
   
   public static void main(String args[])
     {
	try
	  {
	     new AnimatedFractalCalcer(args[0]);
	  }
	catch (Exception e)
	  {
	     System.out.println(e);
	     e.printStackTrace();
	  }
     }
}

CalcRequest.java

public class CalcRequest 
  implements Message
{
   public String returnTo;
   public CalcRequest(String returnImageTo)
     {
	returnTo=returnImageTo;
     }
}

FractalAnimator.java

import java.util.*;
import java.awt.*;
import java.awt.event.*;
import java.awt.image.*;
import java.rmi.*;
import java.rmi.server.*;

class FractalBufferedImage
  extends Canvas
{ 
   BufferedImage image;
   int XS=0;
   int YS=0;
   
   FractalBufferedImage()
     {
	Frame f = new Frame();
	f.setSize(XS,YS);
	f.add(this);
	f.show();
     }
   
   public void setImage(BufferedImage img)
     {
	image=img;
	repaint(0);
     }
   
   public void paint(Graphics g)
     {
	if (image!=null)
	  g.drawImage(image,0,0,this);
     }
}

public class FractalAnimator
  extends Actor
{
   protected FractalBufferedImage painter;
   protected String clients[];
   protected double x, dx;
   protected int sendid, nextid;
   protected Vector inQueue;
   public FractalAnimator(String name, String args[]) throws RemoteException
     {
	super(name);
	painter=new FractalBufferedImage();
	clients=args;
	inQueue=new Vector(0,0);
	x  = -1.0;
	dx = 0.2;
	nextid = 1;
	sendid = 1;
	askForNextFrames();
     }
   
   protected void askForNextFrame(String proc)
     {
	x+=dx;
	sendMessage(proc,new JuliaRequest(actor,0.0,x,sendid++));
     }
   
   protected void askForNextFrames()
     {
	int i=1;
	while(i<clients.length)
	  {
	     askForNextFrame(clients[i]);
	     i++;
	  }
     }
   
   protected boolean showFrameWhenAppropriate(ImageMessage frame)
     {
	// if this is the one we were waiting for, show it
	if (nextid == frame.id)
	  {
	     nextid++;
	     painter.setImage(frame.convertToBufferedImage());
	     return true;
	  }
	return false;
     }
   
   protected void newFrameArrived(ImageMessage frame)
     {
	System.out.print("frame "+frame.id+" arrived, was expecting "+nextid+" ");
	if (!showFrameWhenAppropriate(frame))
	  inQueue.add(frame);
	Iterator it=inQueue.iterator();
	while(it.hasNext())
	  {
	     ImageMessage j=(ImageMessage)it.next();
	     if (showFrameWhenAppropriate(j))
	       it.remove();
	  }
     }
       	
   public void handleMessage(Message m)
     {
	if (m instanceof ImageMessage)
	  {
	     ImageMessage im=(ImageMessage)m;
	     newFrameArrived(im);
	     askForNextFrame(im.answerFrom);
	  }
	else 
	  super.handleMessage(m);
     }
   
   public static void main(String args[])
     {
	try
	  {
	     new FractalAnimator(args[0],args);
	  }
	catch (Exception e)
	  {
	     System.out.println(e);
	     e.printStackTrace();
	  }
     }
}

ImageMessage.java

import java.awt.*;
import java.awt.image.*;

public class ImageMessage
  implements Message
{
   public int xs;
   public int ys;
   public int id;
   int[] content;
   String answerFrom;
   public ImageMessage(BufferedImage img, int x, int y)
     {
	xs=x;
	ys=y;
	content=img.getRGB(0, 0, xs, ys, null, 0, xs);
     }
   public ImageMessage(BufferedImage img, int x, int y, String from, int i)
     {
	this(img,x,y);
	id=i;
	answerFrom=from;
     }
   public BufferedImage convertToBufferedImage()
     {
	BufferedImage image;
	image=new BufferedImage(xs, ys, BufferedImage.TYPE_INT_RGB);
	// im.setRGB(0,0,XS,YS,im.getImageData());
	image.setRGB(0,0,xs,ys,content,0,xs);
	return image;
     }
}

JuliaRequest.java

public class JuliaRequest 
  implements Message
{
   public String returnTo;
   public double cx,cy;
   public int id;
   public JuliaRequest(String returnImageTo, double ax, double ay, int i)
     {
	returnTo=returnImageTo;
	cx=ax;
	cy=ay;
	id=i;
     }
}

Message.java

import java.rmi.*;
import java.io.*;

interface Message extends Serializable
{
}

http://werner.yellowcouch.org/
werner@yellowcouch.org